From 7367d823e867f6c96116fe7f23cbd5845fa537b1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 25 Mar 2026 22:45:41 +0000 Subject: [PATCH 1/9] feat(cdk): enable fail-fast shutdown on memory threshold with dual-condition check Add critical memory threshold (95% cgroup + 80% process RSS) that raises AirbyteTracedException with FailureType.system_error when both conditions are met. This gives connectors a clean error message instead of an opaque OOM kill. Key design decisions: - Dual-condition: only raises when BOTH cgroup >= 95% AND process RSS >= 80% of container limit. This avoids false positives from reclaimable kernel page cache (e.g., SQLite file cache). - Process RSS read from /proc/self/status (no new dependencies). - Kill switch: AIRBYTE_MEMORY_FAIL_FAST env var (default: true, set to 'false' to disable). - Failure-path contract: entrypoint read loop wrapped in try/finally to flush queued state messages before exception propagates. Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/15982 Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 16 ++- airbyte_cdk/utils/memory_monitor.py | 111 ++++++++++++++-- unit_tests/utils/test_memory_monitor.py | 161 ++++++++++++++++++++++++ 3 files changed, 275 insertions(+), 13 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..27079af3d 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -279,11 +279,17 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) - for message in self.source.read(self.logger, config, catalog, state): - yield self.handle_record_counts(message, stream_message_counter) - self._memory_monitor.check_memory_usage() - for message in self._emit_queued_messages(self.source): - yield self.handle_record_counts(message, stream_message_counter) + try: + for message in self.source.read(self.logger, config, catalog, state): + yield self.handle_record_counts(message, stream_message_counter) + self._memory_monitor.check_memory_usage() + finally: + # Flush queued messages (state checkpoints, logs) regardless of whether + # the read completed normally or was interrupted by a memory fail-fast + # exception. This ensures the platform receives the last committed state + # so the next sync can resume from the correct checkpoint. + for message in self._emit_queued_messages(self.source): + yield self.handle_record_counts(message, stream_message_counter) @staticmethod def handle_record_counts( diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 0767ce3bf..33fba9bf7 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -2,12 +2,16 @@ # Copyright (c) 2026 Airbyte, Inc., all rights reserved. # -"""Source-side memory introspection to log memory usage approaching container limits.""" +"""Source-side memory introspection with fail-fast shutdown on memory threshold.""" import logging +import os from pathlib import Path from typing import Optional +from airbyte_cdk.models import FailureType +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + logger = logging.getLogger("airbyte") # cgroup v2 paths @@ -18,28 +22,72 @@ _CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") _CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") +# Process-level RSS from /proc/self/status (Linux only, no extra dependency) +_PROC_SELF_STATUS = Path("/proc/self/status") + # Log when usage is at or above 90% _MEMORY_THRESHOLD = 0.90 +# Raise AirbyteTracedException when BOTH conditions are met: +# 1. cgroup usage >= critical threshold +# 2. process RSS >= RSS threshold of the container limit +# This dual-condition avoids false positives from reclaimable kernel page cache. +_CRITICAL_THRESHOLD = 0.95 +_RSS_THRESHOLD = 0.80 + # Check interval (every N messages) _DEFAULT_CHECK_INTERVAL = 5000 +# Environment variable to disable fail-fast (set to "false" to disable) +_ENV_FAIL_FAST = "AIRBYTE_MEMORY_FAIL_FAST" + + +def _read_process_rss_bytes() -> Optional[int]: + """Read current process RSS from /proc/self/status. + + Returns RSS in bytes, or None if unavailable (non-Linux, permission error, etc.). + """ + try: + status_text = _PROC_SELF_STATUS.read_text() + for line in status_text.splitlines(): + if line.startswith("VmRSS:"): + # Format: "VmRSS: 12345 kB" + parts = line.split() + if len(parts) >= 2: + return int(parts[1]) * 1024 # Convert kB to bytes + return None + except (OSError, ValueError): + return None + class MemoryMonitor: - """Monitors container memory usage via cgroup files and logs warnings when usage is high. + """Monitors container memory usage via cgroup files and raises on critical pressure. Lazily probes cgroup v2 then v1 files on the first call to ``check_memory_usage()``. Caches which version exists. If neither is found (local dev / CI), all subsequent calls are instant no-ops. - Logs a WARNING on every check interval (default 5000 messages) when memory - usage is at or above 90% of the container limit. This gives breadcrumb - trails showing whether memory is climbing, plateauing, or sawtoothing. + **Logging (always active):** Logs a WARNING on every check interval (default + 5000 messages) when cgroup memory usage is at or above 90% of the container + limit. + + **Fail-fast (controlled by ``AIRBYTE_MEMORY_FAIL_FAST`` env var, default + enabled):** Raises ``AirbyteTracedException`` with + ``FailureType.system_error`` when *both*: + + 1. Cgroup usage >= 95% of the container limit (container is near OOM-kill) + 2. Process RSS >= 80% of the container limit (pressure is from non-reclaimable + Python heap, not elastic kernel page cache) + + This dual-condition avoids false positives from SQLite page cache or other + kernel-reclaimable memory that inflates cgroup usage but does not represent + real memory pressure. """ def __init__( self, check_interval: int = _DEFAULT_CHECK_INTERVAL, + fail_fast: Optional[bool] = None, ) -> None: if check_interval < 1: raise ValueError(f"check_interval must be >= 1, got {check_interval}") @@ -48,6 +96,13 @@ def __init__( self._cgroup_version: Optional[int] = None self._probed = False + # Resolve fail-fast setting: explicit arg > env var > default (True) + if fail_fast is not None: + self._fail_fast = fail_fast + else: + env_val = os.environ.get(_ENV_FAIL_FAST, "true").strip().lower() + self._fail_fast = env_val != "false" + def _probe_cgroup(self) -> None: """Detect which cgroup version (if any) is available. @@ -102,14 +157,18 @@ def _read_memory(self) -> Optional[tuple[int, int]]: return None def check_memory_usage(self) -> None: - """Check memory usage and log when above 90%. + """Check memory usage; log at 90% and raise at critical dual-condition. Intended to be called on every message. The monitor internally tracks a message counter and only reads cgroup files every ``check_interval`` messages (default 5000) to minimise I/O overhead. - Logs a WARNING on every check above 90% to provide breadcrumb trails - showing memory trends over the sync lifetime. + **Logging:** WARNING on every check above 90%. + + **Fail-fast (when enabled):** If cgroup usage >= 95% *and* process RSS + >= 80% of the container limit, raises ``AirbyteTracedException`` with + ``FailureType.system_error`` so the platform receives a clear error + message instead of an opaque OOM-kill. This method is a no-op if cgroup files are unavailable. """ @@ -138,3 +197,39 @@ def check_memory_usage(self) -> None: usage_gb, limit_gb, ) + + # Fail-fast: dual-condition check + if self._fail_fast and usage_ratio >= _CRITICAL_THRESHOLD: + rss_bytes = _read_process_rss_bytes() + if rss_bytes is not None: + rss_ratio = rss_bytes / limit_bytes + rss_percent = int(rss_ratio * 100) + if rss_ratio >= _RSS_THRESHOLD: + raise AirbyteTracedException( + message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).", + internal_message=( + f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). " + f"Process RSS: {rss_bytes} bytes ({rss_percent}% of limit). " + f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, " + f"RSS >= {int(_RSS_THRESHOLD * 100)}%." + ), + failure_type=FailureType.system_error, + ) + else: + logger.info( + "Cgroup usage at %d%% but process RSS only %d%% of limit; " + "pressure likely from reclaimable page cache — not raising.", + usage_percent, + rss_percent, + ) + else: + # Cannot read process RSS (non-Linux?) — fall back to cgroup-only + raise AirbyteTracedException( + message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).", + internal_message=( + f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). " + f"Process RSS unavailable (non-Linux environment). " + f"Cgroup threshold: >= {int(_CRITICAL_THRESHOLD * 100)}%." + ), + failure_type=FailureType.system_error, + ) diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index cf8250465..e07374a3c 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -8,18 +8,27 @@ import pytest +from airbyte_cdk.models import FailureType from airbyte_cdk.utils.memory_monitor import ( _CGROUP_V1_LIMIT, _CGROUP_V1_USAGE, _CGROUP_V2_CURRENT, _CGROUP_V2_MAX, + _PROC_SELF_STATUS, MemoryMonitor, + _read_process_rss_bytes, ) +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB _MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB +_MOCK_USAGE_AT_95 = "960000000\n" # 96% of 1 GB _MOCK_LIMIT = "1000000000\n" # 1 GB +# RSS mock values (in kB as they appear in /proc/self/status) +_MOCK_RSS_HIGH = "VmRSS:\t 820000 kB\n" # ~82% of 1 GB (above 80% threshold) +_MOCK_RSS_LOW = "VmRSS:\t 500000 kB\n" # ~50% of 1 GB (below 80% threshold) + def _v2_exists(self: Path) -> bool: return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) @@ -245,3 +254,155 @@ def mock_read_text(self: Path) -> str: ): monitor.check_memory_usage() assert not caplog.records + + +# --------------------------------------------------------------------------- +# _read_process_rss_bytes — unit tests +# --------------------------------------------------------------------------- + + +def test_read_process_rss_bytes_parses_vmrss() -> None: + """Correctly parses VmRSS from /proc/self/status content.""" + status_content = ( + "Name:\tpython3\nVmPeak:\t 1000000 kB\nVmRSS:\t 512000 kB\nVmSwap:\t 0 kB\n" + ) + with patch.object(Path, "read_text", return_value=status_content): + result = _read_process_rss_bytes() + assert result == 512000 * 1024 + + +def test_read_process_rss_bytes_returns_none_on_missing_file() -> None: + """Returns None when /proc/self/status is unreadable.""" + + def raise_oserror(self: Path) -> str: + raise OSError("No such file") + + with patch.object(Path, "read_text", raise_oserror): + assert _read_process_rss_bytes() is None + + +def test_read_process_rss_bytes_returns_none_when_vmrss_absent() -> None: + """Returns None when VmRSS line is not present.""" + with patch.object(Path, "read_text", return_value="Name:\tpython3\n"): + assert _read_process_rss_bytes() is None + + +# --------------------------------------------------------------------------- +# check_memory_usage — fail-fast (dual-condition) +# --------------------------------------------------------------------------- + + +def _proc_status_read(rss_content: str): + """Return a mock read_text that serves cgroup v2 AND /proc/self/status.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return _MOCK_USAGE_AT_95 + if self == _CGROUP_V2_MAX: + return _MOCK_LIMIT + if self == _PROC_SELF_STATUS: + return rss_content + return "" + + return mock_read_text + + +def test_raises_when_both_cgroup_and_rss_above_thresholds() -> None: + """Fail-fast raises AirbyteTracedException when both cgroup >= 95% and RSS >= 80%.""" + monitor = MemoryMonitor(check_interval=1, fail_fast=True) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _proc_status_read(_MOCK_RSS_HIGH)), + ): + with pytest.raises(AirbyteTracedException) as exc_info: + monitor.check_memory_usage() + assert exc_info.value.failure_type == FailureType.system_error + assert "critical threshold" in (exc_info.value.message or "") + assert "96%" in (exc_info.value.message or "") + + +def test_no_raise_when_cgroup_high_but_rss_low(caplog: pytest.LogCaptureFixture) -> None: + """No exception when cgroup >= 95% but RSS < 80% (page cache scenario).""" + monitor = MemoryMonitor(check_interval=1, fail_fast=True) + with ( + caplog.at_level(logging.INFO, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _proc_status_read(_MOCK_RSS_LOW)), + ): + monitor.check_memory_usage() # Should NOT raise + # Should log an info message about page cache + info_records = [r for r in caplog.records if r.levelno == logging.INFO] + assert any("page cache" in r.message for r in info_records) + + +def test_no_raise_when_cgroup_below_critical() -> None: + """No exception when cgroup < 95%, even with high RSS.""" + monitor = MemoryMonitor(check_interval=1, fail_fast=True) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + ): + monitor.check_memory_usage() # Should NOT raise + + +def test_raises_when_rss_unavailable_and_cgroup_critical() -> None: + """Falls back to cgroup-only and raises when /proc/self/status is unavailable.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return _MOCK_USAGE_AT_95 + if self == _CGROUP_V2_MAX: + return _MOCK_LIMIT + if self == _PROC_SELF_STATUS: + raise OSError("No such file") + return "" + + monitor = MemoryMonitor(check_interval=1, fail_fast=True) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): + with pytest.raises(AirbyteTracedException) as exc_info: + monitor.check_memory_usage() + assert exc_info.value.failure_type == FailureType.system_error + assert "RSS unavailable" in (exc_info.value.internal_message or "") + + +# --------------------------------------------------------------------------- +# check_memory_usage — fail-fast feature flag +# --------------------------------------------------------------------------- + + +def test_fail_fast_disabled_via_constructor() -> None: + """No exception when fail_fast=False even at critical thresholds.""" + monitor = MemoryMonitor(check_interval=1, fail_fast=False) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _proc_status_read(_MOCK_RSS_HIGH)), + ): + monitor.check_memory_usage() # Should NOT raise + + +def test_fail_fast_disabled_via_env_var() -> None: + """No exception when AIRBYTE_MEMORY_FAIL_FAST=false.""" + with patch.dict("os.environ", {"AIRBYTE_MEMORY_FAIL_FAST": "false"}): + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _proc_status_read(_MOCK_RSS_HIGH)), + ): + monitor.check_memory_usage() # Should NOT raise + + +def test_fail_fast_enabled_by_default() -> None: + """Fail-fast is enabled by default (no env var, no explicit arg).""" + with patch.dict("os.environ", {}, clear=False): + monitor = MemoryMonitor(check_interval=1) + assert monitor._fail_fast is True + + +def test_fail_fast_constructor_overrides_env_var() -> None: + """Explicit fail_fast=True overrides env var set to false.""" + with patch.dict("os.environ", {"AIRBYTE_MEMORY_FAIL_FAST": "false"}): + monitor = MemoryMonitor(check_interval=1, fail_fast=True) + assert monitor._fail_fast is True From 748212f59392d950906d43ef27ad4adfa2b27cc2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 23:34:49 +0000 Subject: [PATCH 2/9] refactor: use RssAnon instead of VmRSS, raise critical threshold to 98%, log+skip on missing RssAnon Address review feedback: - Parse RssAnon from /proc/self/status instead of VmRSS to avoid inflation from file-backed/shared resident pages - Raise _CRITICAL_THRESHOLD from 95% to 98% for more conservative fail-fast behavior - When RssAnon is unavailable, log a warning and skip fail-fast instead of falling back to cgroup-only raising (stays truly dual-condition) - Add test proving metric choice matters (VmRSS high but RssAnon low) - Update all docstrings/comments to say 'anonymous resident memory' Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 86 +++++++++++--------- unit_tests/utils/test_memory_monitor.py | 100 +++++++++++++++--------- 2 files changed, 108 insertions(+), 78 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 33fba9bf7..75dfe1507 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -22,7 +22,7 @@ _CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") _CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") -# Process-level RSS from /proc/self/status (Linux only, no extra dependency) +# Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency) _PROC_SELF_STATUS = Path("/proc/self/status") # Log when usage is at or above 90% @@ -30,10 +30,11 @@ # Raise AirbyteTracedException when BOTH conditions are met: # 1. cgroup usage >= critical threshold -# 2. process RSS >= RSS threshold of the container limit -# This dual-condition avoids false positives from reclaimable kernel page cache. -_CRITICAL_THRESHOLD = 0.95 -_RSS_THRESHOLD = 0.80 +# 2. process anonymous RSS (RssAnon) >= anon threshold of the container limit +# This dual-condition avoids false positives from reclaimable kernel page cache +# and file-backed / shared resident pages that inflate VmRSS. +_CRITICAL_THRESHOLD = 0.98 +_ANON_RSS_THRESHOLD = 0.80 # Check interval (every N messages) _DEFAULT_CHECK_INTERVAL = 5000 @@ -42,16 +43,22 @@ _ENV_FAIL_FAST = "AIRBYTE_MEMORY_FAIL_FAST" -def _read_process_rss_bytes() -> Optional[int]: - """Read current process RSS from /proc/self/status. +def _read_process_anon_rss_bytes() -> Optional[int]: + """Read process-private anonymous resident memory from /proc/self/status. - Returns RSS in bytes, or None if unavailable (non-Linux, permission error, etc.). + Parses the ``RssAnon`` field which represents private anonymous pages — the + closest proxy for Python-heap memory pressure. Unlike ``VmRSS`` (which is + ``RssAnon + RssFile + RssShmem``), ``RssAnon`` is not inflated by mmap'd + file-backed or shared resident pages. + + Returns anonymous RSS in bytes, or None if unavailable (non-Linux, + permission error, or ``RssAnon`` field not present in the kernel). """ try: status_text = _PROC_SELF_STATUS.read_text() for line in status_text.splitlines(): - if line.startswith("VmRSS:"): - # Format: "VmRSS: 12345 kB" + if line.startswith("RssAnon:"): + # Format: "RssAnon: 12345 kB" parts = line.split() if len(parts) >= 2: return int(parts[1]) * 1024 # Convert kB to bytes @@ -75,13 +82,16 @@ class MemoryMonitor: enabled):** Raises ``AirbyteTracedException`` with ``FailureType.system_error`` when *both*: - 1. Cgroup usage >= 95% of the container limit (container is near OOM-kill) - 2. Process RSS >= 80% of the container limit (pressure is from non-reclaimable - Python heap, not elastic kernel page cache) + 1. Cgroup usage >= 98% of the container limit (container is near OOM-kill) + 2. Process anonymous RSS (``RssAnon``) >= 80% of the container limit + (pressure is from process-private anonymous memory, not elastic kernel + page cache or file-backed resident pages) - This dual-condition avoids false positives from SQLite page cache or other - kernel-reclaimable memory that inflates cgroup usage but does not represent - real memory pressure. + This dual-condition avoids false positives from SQLite mmap'd pages, shared + memory, or other kernel-reclaimable memory that inflates cgroup usage but + does not represent real process memory pressure. If ``RssAnon`` is not + available, the monitor logs a warning and skips fail-fast rather than + falling back to cgroup-only raising. """ def __init__( @@ -165,10 +175,11 @@ def check_memory_usage(self) -> None: **Logging:** WARNING on every check above 90%. - **Fail-fast (when enabled):** If cgroup usage >= 95% *and* process RSS - >= 80% of the container limit, raises ``AirbyteTracedException`` with - ``FailureType.system_error`` so the platform receives a clear error - message instead of an opaque OOM-kill. + **Fail-fast (when enabled):** If cgroup usage >= 98% *and* process + anonymous RSS (``RssAnon``) >= 80% of the container limit, raises + ``AirbyteTracedException`` with ``FailureType.system_error`` so the + platform receives a clear error message instead of an opaque OOM-kill. + If ``RssAnon`` is unavailable, logs a warning and skips fail-fast. This method is a no-op if cgroup files are unavailable. """ @@ -200,36 +211,33 @@ def check_memory_usage(self) -> None: # Fail-fast: dual-condition check if self._fail_fast and usage_ratio >= _CRITICAL_THRESHOLD: - rss_bytes = _read_process_rss_bytes() - if rss_bytes is not None: - rss_ratio = rss_bytes / limit_bytes - rss_percent = int(rss_ratio * 100) - if rss_ratio >= _RSS_THRESHOLD: + anon_rss_bytes = _read_process_anon_rss_bytes() + if anon_rss_bytes is not None: + anon_ratio = anon_rss_bytes / limit_bytes + anon_percent = int(anon_ratio * 100) + if anon_ratio >= _ANON_RSS_THRESHOLD: raise AirbyteTracedException( message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).", internal_message=( f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). " - f"Process RSS: {rss_bytes} bytes ({rss_percent}% of limit). " + f"Process anonymous RSS (RssAnon): {anon_rss_bytes} bytes ({anon_percent}% of limit). " f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, " - f"RSS >= {int(_RSS_THRESHOLD * 100)}%." + f"anonymous RSS >= {int(_ANON_RSS_THRESHOLD * 100)}%." ), failure_type=FailureType.system_error, ) else: logger.info( - "Cgroup usage at %d%% but process RSS only %d%% of limit; " - "pressure likely from reclaimable page cache — not raising.", + "Cgroup usage at %d%% but process anonymous RSS only %d%% of limit; " + "pressure likely from file-backed or reclaimable pages — not raising.", usage_percent, - rss_percent, + anon_percent, ) else: - # Cannot read process RSS (non-Linux?) — fall back to cgroup-only - raise AirbyteTracedException( - message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).", - internal_message=( - f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). " - f"Process RSS unavailable (non-Linux environment). " - f"Cgroup threshold: >= {int(_CRITICAL_THRESHOLD * 100)}%." - ), - failure_type=FailureType.system_error, + # RssAnon unavailable — log and skip rather than cgroup-only raising, + # so the implementation stays truly dual-condition. + logger.warning( + "Cgroup usage at %d%% but RssAnon unavailable from /proc/self/status; " + "skipping fail-fast (cannot confirm anonymous memory pressure).", + usage_percent, ) diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index e07374a3c..f0e17fad1 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -16,18 +16,24 @@ _CGROUP_V2_MAX, _PROC_SELF_STATUS, MemoryMonitor, - _read_process_rss_bytes, + _read_process_anon_rss_bytes, ) from airbyte_cdk.utils.traced_exception import AirbyteTracedException _MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB _MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB -_MOCK_USAGE_AT_95 = "960000000\n" # 96% of 1 GB +_MOCK_USAGE_AT_97 = "970000000\n" # 97% of 1 GB (below 98% critical threshold) +_MOCK_USAGE_AT_98 = "980000000\n" # 98% of 1 GB (at critical threshold) _MOCK_LIMIT = "1000000000\n" # 1 GB -# RSS mock values (in kB as they appear in /proc/self/status) -_MOCK_RSS_HIGH = "VmRSS:\t 820000 kB\n" # ~82% of 1 GB (above 80% threshold) -_MOCK_RSS_LOW = "VmRSS:\t 500000 kB\n" # ~50% of 1 GB (below 80% threshold) +# Anonymous RSS mock values (in kB as they appear in /proc/self/status RssAnon field). +# VmRSS is intentionally kept high in the "low anon" mock to prove the metric choice matters: +# VmRSS can be inflated by file-backed pages while RssAnon stays low. +_MOCK_ANON_HIGH = "RssAnon:\t 820000 kB\n" # ~82% of 1 GB (above 80% threshold) +_MOCK_ANON_LOW_VMRSS_HIGH = ( + "VmRSS:\t 900000 kB\n" # ~90% of 1 GB — high total RSS + "RssAnon:\t 500000 kB\n" # ~50% of 1 GB — low anonymous RSS +) def _v2_exists(self: Path) -> bool: @@ -257,34 +263,42 @@ def mock_read_text(self: Path) -> str: # --------------------------------------------------------------------------- -# _read_process_rss_bytes — unit tests +# _read_process_anon_rss_bytes — unit tests # --------------------------------------------------------------------------- -def test_read_process_rss_bytes_parses_vmrss() -> None: - """Correctly parses VmRSS from /proc/self/status content.""" +def test_read_process_anon_rss_bytes_parses_rssanon() -> None: + """Correctly parses RssAnon from /proc/self/status content.""" status_content = ( - "Name:\tpython3\nVmPeak:\t 1000000 kB\nVmRSS:\t 512000 kB\nVmSwap:\t 0 kB\n" + "Name:\tpython3\nVmRSS:\t 1000000 kB\nRssAnon:\t 512000 kB\nRssShmem:\t 0 kB\n" ) with patch.object(Path, "read_text", return_value=status_content): - result = _read_process_rss_bytes() + result = _read_process_anon_rss_bytes() assert result == 512000 * 1024 -def test_read_process_rss_bytes_returns_none_on_missing_file() -> None: +def test_read_process_anon_rss_bytes_returns_none_on_missing_file() -> None: """Returns None when /proc/self/status is unreadable.""" def raise_oserror(self: Path) -> str: raise OSError("No such file") with patch.object(Path, "read_text", raise_oserror): - assert _read_process_rss_bytes() is None + assert _read_process_anon_rss_bytes() is None + + +def test_read_process_anon_rss_bytes_returns_none_when_rssanon_absent() -> None: + """Returns None when RssAnon line is not present (e.g. older kernel).""" + with patch.object(Path, "read_text", return_value="Name:\tpython3\nVmRSS:\t 512000 kB\n"): + assert _read_process_anon_rss_bytes() is None -def test_read_process_rss_bytes_returns_none_when_vmrss_absent() -> None: - """Returns None when VmRSS line is not present.""" - with patch.object(Path, "read_text", return_value="Name:\tpython3\n"): - assert _read_process_rss_bytes() is None +def test_read_process_anon_rss_bytes_ignores_vmrss() -> None: + """Ensures the parser reads RssAnon specifically, not VmRSS.""" + # Only VmRSS present, no RssAnon — should return None + status_content = "VmRSS:\t 900000 kB\n" + with patch.object(Path, "read_text", return_value=status_content): + assert _read_process_anon_rss_bytes() is None # --------------------------------------------------------------------------- @@ -292,65 +306,73 @@ def test_read_process_rss_bytes_returns_none_when_vmrss_absent() -> None: # --------------------------------------------------------------------------- -def _proc_status_read(rss_content: str): +def _proc_status_read(anon_content: str, usage: str = _MOCK_USAGE_AT_98): """Return a mock read_text that serves cgroup v2 AND /proc/self/status.""" def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_AT_95 + return usage if self == _CGROUP_V2_MAX: return _MOCK_LIMIT if self == _PROC_SELF_STATUS: - return rss_content + return anon_content return "" return mock_read_text -def test_raises_when_both_cgroup_and_rss_above_thresholds() -> None: - """Fail-fast raises AirbyteTracedException when both cgroup >= 95% and RSS >= 80%.""" +def test_raises_when_both_cgroup_and_anon_rss_above_thresholds() -> None: + """Fail-fast raises AirbyteTracedException when both cgroup >= 98% and RssAnon >= 80%.""" monitor = MemoryMonitor(check_interval=1, fail_fast=True) with ( patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _proc_status_read(_MOCK_RSS_HIGH)), + patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH)), ): with pytest.raises(AirbyteTracedException) as exc_info: monitor.check_memory_usage() assert exc_info.value.failure_type == FailureType.system_error assert "critical threshold" in (exc_info.value.message or "") - assert "96%" in (exc_info.value.message or "") + assert "98%" in (exc_info.value.message or "") + assert "anonymous RSS" in (exc_info.value.internal_message or "") + +def test_no_raise_when_cgroup_high_but_anon_rss_low(caplog: pytest.LogCaptureFixture) -> None: + """No exception when cgroup >= 98% but RssAnon < 80% (file-backed pages scenario). -def test_no_raise_when_cgroup_high_but_rss_low(caplog: pytest.LogCaptureFixture) -> None: - """No exception when cgroup >= 95% but RSS < 80% (page cache scenario).""" + This test also proves the metric choice matters: VmRSS is 90% (high) but + RssAnon is only 50% (low), so the pressure is from file-backed pages. + """ monitor = MemoryMonitor(check_interval=1, fail_fast=True) with ( caplog.at_level(logging.INFO, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _proc_status_read(_MOCK_RSS_LOW)), + patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_LOW_VMRSS_HIGH)), ): monitor.check_memory_usage() # Should NOT raise - # Should log an info message about page cache info_records = [r for r in caplog.records if r.levelno == logging.INFO] - assert any("page cache" in r.message for r in info_records) + assert any("file-backed" in r.message for r in info_records) def test_no_raise_when_cgroup_below_critical() -> None: - """No exception when cgroup < 95%, even with high RSS.""" + """No exception when cgroup at 97% (< 98% threshold), even with high RssAnon.""" monitor = MemoryMonitor(check_interval=1, fail_fast=True) with ( patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object( + Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH, usage=_MOCK_USAGE_AT_97) + ), ): monitor.check_memory_usage() # Should NOT raise -def test_raises_when_rss_unavailable_and_cgroup_critical() -> None: - """Falls back to cgroup-only and raises when /proc/self/status is unavailable.""" +def test_no_raise_when_anon_rss_unavailable_and_cgroup_critical( + caplog: pytest.LogCaptureFixture, +) -> None: + """Logs warning and skips fail-fast when RssAnon is unavailable (stays truly dual-condition).""" def mock_read_text(self: Path) -> str: if self == _CGROUP_V2_CURRENT: - return _MOCK_USAGE_AT_95 + return _MOCK_USAGE_AT_98 if self == _CGROUP_V2_MAX: return _MOCK_LIMIT if self == _PROC_SELF_STATUS: @@ -359,13 +381,13 @@ def mock_read_text(self: Path) -> str: monitor = MemoryMonitor(check_interval=1, fail_fast=True) with ( + caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), patch.object(Path, "read_text", mock_read_text), ): - with pytest.raises(AirbyteTracedException) as exc_info: - monitor.check_memory_usage() - assert exc_info.value.failure_type == FailureType.system_error - assert "RSS unavailable" in (exc_info.value.internal_message or "") + monitor.check_memory_usage() # Should NOT raise + warning_records = [r for r in caplog.records if r.levelno == logging.WARNING] + assert any("RssAnon unavailable" in r.message for r in warning_records) # --------------------------------------------------------------------------- @@ -378,7 +400,7 @@ def test_fail_fast_disabled_via_constructor() -> None: monitor = MemoryMonitor(check_interval=1, fail_fast=False) with ( patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _proc_status_read(_MOCK_RSS_HIGH)), + patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH)), ): monitor.check_memory_usage() # Should NOT raise @@ -389,7 +411,7 @@ def test_fail_fast_disabled_via_env_var() -> None: monitor = MemoryMonitor(check_interval=1) with ( patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _proc_status_read(_MOCK_RSS_HIGH)), + patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH)), ): monitor.check_memory_usage() # Should NOT raise From aed6be59d807050d4fb8fa8124df96a4c317860c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 23:51:35 +0000 Subject: [PATCH 3/9] fix: avoid yield-in-finally and isolate env var in default test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Copilot review feedback: - Restructure entrypoint.py to catch the fail-fast exception around check_memory_usage(), flush queued messages, then re-raise — avoids yielding inside a finally block which can trigger RuntimeError on GeneratorExit. - Use clear=True in patch.dict for test_fail_fast_enabled_by_default to ensure AIRBYTE_MEMORY_FAIL_FAST is truly unset in test env. Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 24 ++++++++++++++---------- unit_tests/utils/test_memory_monitor.py | 2 +- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 27079af3d..c7540aac2 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -279,17 +279,21 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) - try: - for message in self.source.read(self.logger, config, catalog, state): - yield self.handle_record_counts(message, stream_message_counter) + for message in self.source.read(self.logger, config, catalog, state): + yield self.handle_record_counts(message, stream_message_counter) + try: self._memory_monitor.check_memory_usage() - finally: - # Flush queued messages (state checkpoints, logs) regardless of whether - # the read completed normally or was interrupted by a memory fail-fast - # exception. This ensures the platform receives the last committed state - # so the next sync can resume from the correct checkpoint. - for message in self._emit_queued_messages(self.source): - yield self.handle_record_counts(message, stream_message_counter) + except Exception: + # Flush queued messages (state checkpoints, logs) before propagating + # a memory fail-fast (or other) exception, so the platform receives + # the last committed state for the next sync. + for queued_message in self._emit_queued_messages(self.source): + yield self.handle_record_counts(queued_message, stream_message_counter) + raise + + # Flush queued messages after normal completion of the read loop. + for message in self._emit_queued_messages(self.source): + yield self.handle_record_counts(message, stream_message_counter) @staticmethod def handle_record_counts( diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index f0e17fad1..e834fc13f 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -418,7 +418,7 @@ def test_fail_fast_disabled_via_env_var() -> None: def test_fail_fast_enabled_by_default() -> None: """Fail-fast is enabled by default (no env var, no explicit arg).""" - with patch.dict("os.environ", {}, clear=False): + with patch.dict("os.environ", {}, clear=True): monitor = MemoryMonitor(check_interval=1) assert monitor._fail_fast is True From af3ca827b1151b2a9b52c4344cbcc63ba10a79e2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 23:54:53 +0000 Subject: [PATCH 4/9] refactor: raise logging threshold from 90% to 95% Per review feedback, increase _MEMORY_THRESHOLD from 0.90 to 0.95 to reduce noise from the warning log. Update all related docstrings, comments, and test assertions accordingly. Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 10 ++++---- unit_tests/utils/test_memory_monitor.py | 31 +++++++++++++------------ 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 75dfe1507..2685d25bf 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -25,8 +25,8 @@ # Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency) _PROC_SELF_STATUS = Path("/proc/self/status") -# Log when usage is at or above 90% -_MEMORY_THRESHOLD = 0.90 +# Log when usage is at or above 95% +_MEMORY_THRESHOLD = 0.95 # Raise AirbyteTracedException when BOTH conditions are met: # 1. cgroup usage >= critical threshold @@ -75,7 +75,7 @@ class MemoryMonitor: If neither is found (local dev / CI), all subsequent calls are instant no-ops. **Logging (always active):** Logs a WARNING on every check interval (default - 5000 messages) when cgroup memory usage is at or above 90% of the container + 5000 messages) when cgroup memory usage is at or above 95% of the container limit. **Fail-fast (controlled by ``AIRBYTE_MEMORY_FAIL_FAST`` env var, default @@ -167,13 +167,13 @@ def _read_memory(self) -> Optional[tuple[int, int]]: return None def check_memory_usage(self) -> None: - """Check memory usage; log at 90% and raise at critical dual-condition. + """Check memory usage; log at 95% and raise at critical dual-condition. Intended to be called on every message. The monitor internally tracks a message counter and only reads cgroup files every ``check_interval`` messages (default 5000) to minimise I/O overhead. - **Logging:** WARNING on every check above 90%. + **Logging:** WARNING on every check above 95%. **Fail-fast (when enabled):** If cgroup usage >= 98% *and* process anonymous RSS (``RssAnon``) >= 80% of the container limit, raises diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index e834fc13f..1ffef410d 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -21,7 +21,8 @@ from airbyte_cdk.utils.traced_exception import AirbyteTracedException _MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB -_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB +_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB (below 95% logging threshold) +_MOCK_USAGE_AT_95 = "960000000\n" # 96% of 1 GB (above 95% logging threshold) _MOCK_USAGE_AT_97 = "970000000\n" # 97% of 1 GB (below 98% critical threshold) _MOCK_USAGE_AT_98 = "980000000\n" # 98% of 1 GB (at critical threshold) _MOCK_LIMIT = "1000000000\n" # 1 GB @@ -120,43 +121,43 @@ def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None: def test_no_warning_below_threshold(caplog: pytest.LogCaptureFixture) -> None: - """No warning should be emitted when usage is below 90%.""" + """No warning should be emitted when usage is below 95%.""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_BELOW)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), ): monitor.check_memory_usage() assert not caplog.records # --------------------------------------------------------------------------- -# check_memory_usage — at/above 90% threshold +# check_memory_usage — at/above 95% threshold # --------------------------------------------------------------------------- -def test_logs_at_90_percent(caplog: pytest.LogCaptureFixture) -> None: - """Warning log should be emitted at 91% usage (above 90% threshold).""" +def test_logs_at_95_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning log should be emitted at 96% usage (above 95% threshold).""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), ): monitor.check_memory_usage() assert len(caplog.records) == 1 - assert "91%" in caplog.records[0].message + assert "96%" in caplog.records[0].message -def test_logs_on_every_check_above_90_percent(caplog: pytest.LogCaptureFixture) -> None: - """Warning should be logged on EVERY check interval when above 90%, not just once.""" +def test_logs_on_every_check_above_95_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning should be logged on EVERY check interval when above 95%, not just once.""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), ): monitor.check_memory_usage() monitor.check_memory_usage() @@ -165,7 +166,7 @@ def test_logs_on_every_check_above_90_percent(caplog: pytest.LogCaptureFixture) # All three checks should produce a warning (no one-shot flag) assert len(caplog.records) == 3 for record in caplog.records: - assert "91%" in record.message + assert "96%" in record.message # --------------------------------------------------------------------------- @@ -178,7 +179,7 @@ def test_cgroup_v1_emits_warning(caplog: pytest.LogCaptureFixture) -> None: def mock_read_text(self: Path) -> str: if self == _CGROUP_V1_USAGE: - return _MOCK_USAGE_AT_90 + return _MOCK_USAGE_AT_95 if self == _CGROUP_V1_LIMIT: return _MOCK_LIMIT return "" @@ -192,7 +193,7 @@ def mock_read_text(self: Path) -> str: monitor.check_memory_usage() assert len(caplog.records) == 1 - assert "91%" in caplog.records[0].message + assert "96%" in caplog.records[0].message # --------------------------------------------------------------------------- @@ -206,7 +207,7 @@ def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixtur with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), ): # First 4999 calls should be skipped for _ in range(4999): From 10fc815a6319ed2247861d0e8ec1b346fcdaf135 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 23:56:52 +0000 Subject: [PATCH 5/9] refactor: raise anonymous RSS threshold from 80% to 90% Per review feedback, increase _ANON_RSS_THRESHOLD from 0.80 to 0.90 to align with the conservative 98% cgroup threshold. Start stricter and relax later once fleet data is available. Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 6 +++--- unit_tests/utils/test_memory_monitor.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 2685d25bf..81718be5c 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -34,7 +34,7 @@ # This dual-condition avoids false positives from reclaimable kernel page cache # and file-backed / shared resident pages that inflate VmRSS. _CRITICAL_THRESHOLD = 0.98 -_ANON_RSS_THRESHOLD = 0.80 +_ANON_RSS_THRESHOLD = 0.90 # Check interval (every N messages) _DEFAULT_CHECK_INTERVAL = 5000 @@ -83,7 +83,7 @@ class MemoryMonitor: ``FailureType.system_error`` when *both*: 1. Cgroup usage >= 98% of the container limit (container is near OOM-kill) - 2. Process anonymous RSS (``RssAnon``) >= 80% of the container limit + 2. Process anonymous RSS (``RssAnon``) >= 90% of the container limit (pressure is from process-private anonymous memory, not elastic kernel page cache or file-backed resident pages) @@ -176,7 +176,7 @@ def check_memory_usage(self) -> None: **Logging:** WARNING on every check above 95%. **Fail-fast (when enabled):** If cgroup usage >= 98% *and* process - anonymous RSS (``RssAnon``) >= 80% of the container limit, raises + anonymous RSS (``RssAnon``) >= 90% of the container limit, raises ``AirbyteTracedException`` with ``FailureType.system_error`` so the platform receives a clear error message instead of an opaque OOM-kill. If ``RssAnon`` is unavailable, logs a warning and skips fail-fast. diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 1ffef410d..83705bd0e 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -30,7 +30,7 @@ # Anonymous RSS mock values (in kB as they appear in /proc/self/status RssAnon field). # VmRSS is intentionally kept high in the "low anon" mock to prove the metric choice matters: # VmRSS can be inflated by file-backed pages while RssAnon stays low. -_MOCK_ANON_HIGH = "RssAnon:\t 820000 kB\n" # ~82% of 1 GB (above 80% threshold) +_MOCK_ANON_HIGH = "RssAnon:\t 920000 kB\n" # ~92% of 1 GB (above 90% threshold) _MOCK_ANON_LOW_VMRSS_HIGH = ( "VmRSS:\t 900000 kB\n" # ~90% of 1 GB — high total RSS "RssAnon:\t 500000 kB\n" # ~50% of 1 GB — low anonymous RSS From 94b664fc3bfcb7ceb67055081ab07e3c2dea2552 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 23:58:36 +0000 Subject: [PATCH 6/9] docs: fix stale test docstrings to reference 90% RssAnon threshold Co-Authored-By: bot_apk --- unit_tests/utils/test_memory_monitor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 83705bd0e..3ecb4e181 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -323,7 +323,7 @@ def mock_read_text(self: Path) -> str: def test_raises_when_both_cgroup_and_anon_rss_above_thresholds() -> None: - """Fail-fast raises AirbyteTracedException when both cgroup >= 98% and RssAnon >= 80%.""" + """Fail-fast raises AirbyteTracedException when both cgroup >= 98% and RssAnon >= 90%.""" monitor = MemoryMonitor(check_interval=1, fail_fast=True) with ( patch.object(Path, "exists", _v2_exists), @@ -338,7 +338,7 @@ def test_raises_when_both_cgroup_and_anon_rss_above_thresholds() -> None: def test_no_raise_when_cgroup_high_but_anon_rss_low(caplog: pytest.LogCaptureFixture) -> None: - """No exception when cgroup >= 98% but RssAnon < 80% (file-backed pages scenario). + """No exception when cgroup >= 98% but RssAnon < 90% (file-backed pages scenario). This test also proves the metric choice matters: VmRSS is 90% (high) but RssAnon is only 50% (low), so the pressure is from file-backed pages. From 6cd346778307a297e91dd3d349bd7cdad5629193 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 23:40:09 +0000 Subject: [PATCH 7/9] refactor: narrow exception catch to AirbyteTracedException, add flush-before-raise test Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 6 +-- unit_tests/test_entrypoint.py | 73 +++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index c7540aac2..82136b65f 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -283,10 +283,10 @@ def read( yield self.handle_record_counts(message, stream_message_counter) try: self._memory_monitor.check_memory_usage() - except Exception: + except AirbyteTracedException: # Flush queued messages (state checkpoints, logs) before propagating - # a memory fail-fast (or other) exception, so the platform receives - # the last committed state for the next sync. + # the memory fail-fast exception, so the platform receives the last + # committed state for the next sync. for queued_message in self._emit_queued_messages(self.source): yield self.handle_record_counts(queued_message, stream_message_counter) raise diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..8b0931d42 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -856,3 +856,76 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( # There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here record_messages = list(filter(lambda message: "RECORD" in message, messages)) assert len(record_messages) == 2 + + +def test_memory_failfast_flushes_queued_state_before_raising(mocker): + """Queued state messages are emitted before AirbyteTracedException propagates from memory monitor.""" + # Build a state message that will sit in the message repository queue + queued_state = AirbyteMessage( + type=Type.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="users", namespace=None), + stream_state=AirbyteStateBlob({"cursor": "abc123"}), + ), + ), + ) + + # Set up the message repository mock so consume_queue returns the state on first call + message_repository = MagicMock() + message_repository.consume_queue.side_effect = [ + [], # initial flush in run() before read() + [queued_state], # flush during fail-fast exception handling + [], # final flush in run() finally block + ] + mocker.patch.object( + MockSource, + "message_repository", + new_callable=mocker.PropertyMock, + return_value=message_repository, + ) + + # Source emits one record before memory monitor raises + record = AirbyteMessage( + record=AirbyteRecordMessage(stream="users", data={"id": 1}, emitted_at=1), + type=Type.RECORD, + ) + mocker.patch.object(MockSource, "read_state", return_value={}) + mocker.patch.object(MockSource, "read_catalog", return_value={}) + mocker.patch.object(MockSource, "read", return_value=[record]) + + fail_fast_exc = AirbyteTracedException( + message="Memory usage exceeded critical threshold (98%)", + failure_type=FailureType.system_error, + ) + + config = {"username": "fake"} + mocker.patch.object(MockSource, "read_config", return_value=config) + mocker.patch.object(MockSource, "configure", return_value=config) + + entrypoint_obj = AirbyteEntrypoint(MockSource()) + mocker.patch.object( + entrypoint_obj._memory_monitor, "check_memory_usage", side_effect=fail_fast_exc + ) + + mocker.patch.object( + MockSource, "spec", return_value=ConnectorSpecification(connectionSpecification={}) + ) + + parsed_args = Namespace( + command="read", config="config_path", state="statepath", catalog="catalogpath" + ) + + # Collect all yielded messages before the exception + emitted: list[str] = [] + with pytest.raises(AirbyteTracedException) as exc_info: + for msg in entrypoint_obj.run(parsed_args): + emitted.append(msg) + + assert exc_info.value is fail_fast_exc + + # The record should be yielded first, then the queued state (flushed during exception handling) + state_messages = [m for m in emitted if "STATE" in m] + assert len(state_messages) == 1, "Queued state should be flushed before exception propagates" + assert "abc123" in state_messages[0] From e649a70e14561d5ab766808bdf38a12e76b9926c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 23:42:23 +0000 Subject: [PATCH 8/9] test: enhance flush-before-raise test to verify sourceStats.recordCount and step-by-step generator iteration Co-Authored-By: bot_apk --- unit_tests/test_entrypoint.py | 48 ++++++++++++++++------------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 8b0931d42..35cd608a3 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -859,8 +859,7 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( def test_memory_failfast_flushes_queued_state_before_raising(mocker): - """Queued state messages are emitted before AirbyteTracedException propagates from memory monitor.""" - # Build a state message that will sit in the message repository queue + """Record emitted → check_memory_usage raises → queued STATE flushed with recordCount → exception propagates.""" queued_state = AirbyteMessage( type=Type.STATE, state=AirbyteStateMessage( @@ -872,12 +871,10 @@ def test_memory_failfast_flushes_queued_state_before_raising(mocker): ), ) - # Set up the message repository mock so consume_queue returns the state on first call message_repository = MagicMock() message_repository.consume_queue.side_effect = [ - [], # initial flush in run() before read() [queued_state], # flush during fail-fast exception handling - [], # final flush in run() finally block + [], # normal end-of-loop flush (not reached) ] mocker.patch.object( MockSource, @@ -886,7 +883,6 @@ def test_memory_failfast_flushes_queued_state_before_raising(mocker): return_value=message_repository, ) - # Source emits one record before memory monitor raises record = AirbyteMessage( record=AirbyteRecordMessage(stream="users", data={"id": 1}, emitted_at=1), type=Type.RECORD, @@ -900,32 +896,32 @@ def test_memory_failfast_flushes_queued_state_before_raising(mocker): failure_type=FailureType.system_error, ) - config = {"username": "fake"} - mocker.patch.object(MockSource, "read_config", return_value=config) - mocker.patch.object(MockSource, "configure", return_value=config) - entrypoint_obj = AirbyteEntrypoint(MockSource()) mocker.patch.object( entrypoint_obj._memory_monitor, "check_memory_usage", side_effect=fail_fast_exc ) - mocker.patch.object( - MockSource, "spec", return_value=ConnectorSpecification(connectionSpecification={}) - ) + spec = ConnectorSpecification(connectionSpecification={}) + config: dict[str, str] = {} - parsed_args = Namespace( - command="read", config="config_path", state="statepath", catalog="catalogpath" - ) + # Call read() directly to get AirbyteMessage objects (not serialised strings) + gen = entrypoint_obj.read(spec, config, {}, []) - # Collect all yielded messages before the exception - emitted: list[str] = [] - with pytest.raises(AirbyteTracedException) as exc_info: - for msg in entrypoint_obj.run(parsed_args): - emitted.append(msg) + # 1. First yielded message is the RECORD + first = next(gen) + assert first.type == Type.RECORD + assert first.record.stream == "users" # type: ignore[union-attr] - assert exc_info.value is fail_fast_exc + # 2. Second yielded message is the queued STATE (flushed before exception) + second = next(gen) + assert second.type == Type.STATE + assert second.state.stream.stream_state == AirbyteStateBlob({"cursor": "abc123"}) # type: ignore[union-attr] - # The record should be yielded first, then the queued state (flushed during exception handling) - state_messages = [m for m in emitted if "STATE" in m] - assert len(state_messages) == 1, "Queued state should be flushed before exception propagates" - assert "abc123" in state_messages[0] + # 3. The STATE passed through handle_record_counts, so sourceStats.recordCount == 1.0 + assert second.state.sourceStats is not None # type: ignore[union-attr] + assert second.state.sourceStats.recordCount == 1.0 # type: ignore[union-attr] + + # 4. Next iteration re-raises the AirbyteTracedException + with pytest.raises(AirbyteTracedException) as exc_info: + next(gen) + assert exc_info.value is fail_fast_exc From 6c2cd9cc98d1f80d325cde6a1ae1e3be943bb137 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 23:48:37 +0000 Subject: [PATCH 9/9] refactor: remove AIRBYTE_MEMORY_FAIL_FAST env var kill switch, make fail-fast unconditional Co-Authored-By: bot_apk --- airbyte_cdk/utils/memory_monitor.py | 26 ++++---------- unit_tests/utils/test_memory_monitor.py | 48 +++---------------------- 2 files changed, 10 insertions(+), 64 deletions(-) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 81718be5c..8382b3c65 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -5,7 +5,6 @@ """Source-side memory introspection with fail-fast shutdown on memory threshold.""" import logging -import os from pathlib import Path from typing import Optional @@ -39,9 +38,6 @@ # Check interval (every N messages) _DEFAULT_CHECK_INTERVAL = 5000 -# Environment variable to disable fail-fast (set to "false" to disable) -_ENV_FAIL_FAST = "AIRBYTE_MEMORY_FAIL_FAST" - def _read_process_anon_rss_bytes() -> Optional[int]: """Read process-private anonymous resident memory from /proc/self/status. @@ -74,12 +70,10 @@ class MemoryMonitor: ``check_memory_usage()``. Caches which version exists. If neither is found (local dev / CI), all subsequent calls are instant no-ops. - **Logging (always active):** Logs a WARNING on every check interval (default - 5000 messages) when cgroup memory usage is at or above 95% of the container - limit. + **Logging:** Logs a WARNING on every check interval (default 5000 messages) + when cgroup memory usage is at or above 95% of the container limit. - **Fail-fast (controlled by ``AIRBYTE_MEMORY_FAIL_FAST`` env var, default - enabled):** Raises ``AirbyteTracedException`` with + **Fail-fast:** Raises ``AirbyteTracedException`` with ``FailureType.system_error`` when *both*: 1. Cgroup usage >= 98% of the container limit (container is near OOM-kill) @@ -97,7 +91,6 @@ class MemoryMonitor: def __init__( self, check_interval: int = _DEFAULT_CHECK_INTERVAL, - fail_fast: Optional[bool] = None, ) -> None: if check_interval < 1: raise ValueError(f"check_interval must be >= 1, got {check_interval}") @@ -106,13 +99,6 @@ def __init__( self._cgroup_version: Optional[int] = None self._probed = False - # Resolve fail-fast setting: explicit arg > env var > default (True) - if fail_fast is not None: - self._fail_fast = fail_fast - else: - env_val = os.environ.get(_ENV_FAIL_FAST, "true").strip().lower() - self._fail_fast = env_val != "false" - def _probe_cgroup(self) -> None: """Detect which cgroup version (if any) is available. @@ -175,8 +161,8 @@ def check_memory_usage(self) -> None: **Logging:** WARNING on every check above 95%. - **Fail-fast (when enabled):** If cgroup usage >= 98% *and* process - anonymous RSS (``RssAnon``) >= 90% of the container limit, raises + **Fail-fast:** If cgroup usage >= 98% *and* process anonymous RSS + (``RssAnon``) >= 90% of the container limit, raises ``AirbyteTracedException`` with ``FailureType.system_error`` so the platform receives a clear error message instead of an opaque OOM-kill. If ``RssAnon`` is unavailable, logs a warning and skips fail-fast. @@ -210,7 +196,7 @@ def check_memory_usage(self) -> None: ) # Fail-fast: dual-condition check - if self._fail_fast and usage_ratio >= _CRITICAL_THRESHOLD: + if usage_ratio >= _CRITICAL_THRESHOLD: anon_rss_bytes = _read_process_anon_rss_bytes() if anon_rss_bytes is not None: anon_ratio = anon_rss_bytes / limit_bytes diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index 3ecb4e181..99bd254d0 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -324,7 +324,7 @@ def mock_read_text(self: Path) -> str: def test_raises_when_both_cgroup_and_anon_rss_above_thresholds() -> None: """Fail-fast raises AirbyteTracedException when both cgroup >= 98% and RssAnon >= 90%.""" - monitor = MemoryMonitor(check_interval=1, fail_fast=True) + monitor = MemoryMonitor(check_interval=1) with ( patch.object(Path, "exists", _v2_exists), patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH)), @@ -343,7 +343,7 @@ def test_no_raise_when_cgroup_high_but_anon_rss_low(caplog: pytest.LogCaptureFix This test also proves the metric choice matters: VmRSS is 90% (high) but RssAnon is only 50% (low), so the pressure is from file-backed pages. """ - monitor = MemoryMonitor(check_interval=1, fail_fast=True) + monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.INFO, logger="airbyte"), patch.object(Path, "exists", _v2_exists), @@ -356,7 +356,7 @@ def test_no_raise_when_cgroup_high_but_anon_rss_low(caplog: pytest.LogCaptureFix def test_no_raise_when_cgroup_below_critical() -> None: """No exception when cgroup at 97% (< 98% threshold), even with high RssAnon.""" - monitor = MemoryMonitor(check_interval=1, fail_fast=True) + monitor = MemoryMonitor(check_interval=1) with ( patch.object(Path, "exists", _v2_exists), patch.object( @@ -380,7 +380,7 @@ def mock_read_text(self: Path) -> str: raise OSError("No such file") return "" - monitor = MemoryMonitor(check_interval=1, fail_fast=True) + monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), @@ -389,43 +389,3 @@ def mock_read_text(self: Path) -> str: monitor.check_memory_usage() # Should NOT raise warning_records = [r for r in caplog.records if r.levelno == logging.WARNING] assert any("RssAnon unavailable" in r.message for r in warning_records) - - -# --------------------------------------------------------------------------- -# check_memory_usage — fail-fast feature flag -# --------------------------------------------------------------------------- - - -def test_fail_fast_disabled_via_constructor() -> None: - """No exception when fail_fast=False even at critical thresholds.""" - monitor = MemoryMonitor(check_interval=1, fail_fast=False) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH)), - ): - monitor.check_memory_usage() # Should NOT raise - - -def test_fail_fast_disabled_via_env_var() -> None: - """No exception when AIRBYTE_MEMORY_FAIL_FAST=false.""" - with patch.dict("os.environ", {"AIRBYTE_MEMORY_FAIL_FAST": "false"}): - monitor = MemoryMonitor(check_interval=1) - with ( - patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH)), - ): - monitor.check_memory_usage() # Should NOT raise - - -def test_fail_fast_enabled_by_default() -> None: - """Fail-fast is enabled by default (no env var, no explicit arg).""" - with patch.dict("os.environ", {}, clear=True): - monitor = MemoryMonitor(check_interval=1) - assert monitor._fail_fast is True - - -def test_fail_fast_constructor_overrides_env_var() -> None: - """Explicit fail_fast=True overrides env var set to false.""" - with patch.dict("os.environ", {"AIRBYTE_MEMORY_FAIL_FAST": "false"}): - monitor = MemoryMonitor(check_interval=1, fail_fast=True) - assert monitor._fail_fast is True