Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,17 @@ def read(
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
self._memory_monitor.check_memory_usage()
try:
self._memory_monitor.check_memory_usage()
except AirbyteTracedException:
# Flush queued messages (state checkpoints, logs) before propagating
# the memory fail-fast exception, so the platform receives the last
# committed state for the next sync.
for queued_message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(queued_message, stream_message_counter)
raise

# Flush queued messages after normal completion of the read loop.
for message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(message, stream_message_counter)

Expand Down
109 changes: 99 additions & 10 deletions airbyte_cdk/utils/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

"""Source-side memory introspection to log memory usage approaching container limits."""
"""Source-side memory introspection with fail-fast shutdown on memory threshold."""

import logging
from pathlib import Path
from typing import Optional

from airbyte_cdk.models import FailureType
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

logger = logging.getLogger("airbyte")

# cgroup v2 paths
Expand All @@ -18,23 +21,71 @@
_CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
_CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")

# Log when usage is at or above 90%
_MEMORY_THRESHOLD = 0.90
# Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency)
_PROC_SELF_STATUS = Path("/proc/self/status")

# Log when usage is at or above 95%
_MEMORY_THRESHOLD = 0.95

# Raise AirbyteTracedException when BOTH conditions are met:
# 1. cgroup usage >= critical threshold
# 2. process anonymous RSS (RssAnon) >= anon threshold of the container limit
# This dual-condition avoids false positives from reclaimable kernel page cache
# and file-backed / shared resident pages that inflate VmRSS.
_CRITICAL_THRESHOLD = 0.98
_ANON_RSS_THRESHOLD = 0.90

# Check interval (every N messages)
_DEFAULT_CHECK_INTERVAL = 5000


def _read_process_anon_rss_bytes() -> Optional[int]:
"""Read process-private anonymous resident memory from /proc/self/status.

Parses the ``RssAnon`` field which represents private anonymous pages — the
closest proxy for Python-heap memory pressure. Unlike ``VmRSS`` (which is
``RssAnon + RssFile + RssShmem``), ``RssAnon`` is not inflated by mmap'd
file-backed or shared resident pages.

Returns anonymous RSS in bytes, or None if unavailable (non-Linux,
permission error, or ``RssAnon`` field not present in the kernel).
"""
try:
status_text = _PROC_SELF_STATUS.read_text()
for line in status_text.splitlines():
if line.startswith("RssAnon:"):
# Format: "RssAnon: 12345 kB"
parts = line.split()
if len(parts) >= 2:
return int(parts[1]) * 1024 # Convert kB to bytes
return None
except (OSError, ValueError):
return None


class MemoryMonitor:
"""Monitors container memory usage via cgroup files and logs warnings when usage is high.
"""Monitors container memory usage via cgroup files and raises on critical pressure.

Lazily probes cgroup v2 then v1 files on the first call to
``check_memory_usage()``. Caches which version exists.
If neither is found (local dev / CI), all subsequent calls are instant no-ops.

Logs a WARNING on every check interval (default 5000 messages) when memory
usage is at or above 90% of the container limit. This gives breadcrumb
trails showing whether memory is climbing, plateauing, or sawtoothing.
**Logging:** Logs a WARNING on every check interval (default 5000 messages)
when cgroup memory usage is at or above 95% of the container limit.

**Fail-fast:** Raises ``AirbyteTracedException`` with
``FailureType.system_error`` when *both*:

1. Cgroup usage >= 98% of the container limit (container is near OOM-kill)
2. Process anonymous RSS (``RssAnon``) >= 90% of the container limit
(pressure is from process-private anonymous memory, not elastic kernel
page cache or file-backed resident pages)

This dual-condition avoids false positives from SQLite mmap'd pages, shared
memory, or other kernel-reclaimable memory that inflates cgroup usage but
does not represent real process memory pressure. If ``RssAnon`` is not
available, the monitor logs a warning and skips fail-fast rather than
falling back to cgroup-only raising.
"""

def __init__(
Expand Down Expand Up @@ -102,14 +153,19 @@ def _read_memory(self) -> Optional[tuple[int, int]]:
return None

def check_memory_usage(self) -> None:
"""Check memory usage and log when above 90%.
"""Check memory usage; log at 95% and raise at critical dual-condition.

Intended to be called on every message. The monitor internally tracks
a message counter and only reads cgroup files every ``check_interval``
messages (default 5000) to minimise I/O overhead.

Logs a WARNING on every check above 90% to provide breadcrumb trails
showing memory trends over the sync lifetime.
**Logging:** WARNING on every check above 95%.

**Fail-fast:** If cgroup usage >= 98% *and* process anonymous RSS
(``RssAnon``) >= 90% of the container limit, raises
``AirbyteTracedException`` with ``FailureType.system_error`` so the
platform receives a clear error message instead of an opaque OOM-kill.
If ``RssAnon`` is unavailable, logs a warning and skips fail-fast.

This method is a no-op if cgroup files are unavailable.
"""
Expand Down Expand Up @@ -138,3 +194,36 @@ def check_memory_usage(self) -> None:
usage_gb,
limit_gb,
)

# Fail-fast: dual-condition check
if usage_ratio >= _CRITICAL_THRESHOLD:
anon_rss_bytes = _read_process_anon_rss_bytes()
if anon_rss_bytes is not None:
anon_ratio = anon_rss_bytes / limit_bytes
anon_percent = int(anon_ratio * 100)
if anon_ratio >= _ANON_RSS_THRESHOLD:
raise AirbyteTracedException(
message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).",
internal_message=(
f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). "
f"Process anonymous RSS (RssAnon): {anon_rss_bytes} bytes ({anon_percent}% of limit). "
f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, "
f"anonymous RSS >= {int(_ANON_RSS_THRESHOLD * 100)}%."
),
failure_type=FailureType.system_error,
)
else:
logger.info(
"Cgroup usage at %d%% but process anonymous RSS only %d%% of limit; "
"pressure likely from file-backed or reclaimable pages — not raising.",
usage_percent,
anon_percent,
)
else:
# RssAnon unavailable — log and skip rather than cgroup-only raising,
# so the implementation stays truly dual-condition.
logger.warning(
"Cgroup usage at %d%% but RssAnon unavailable from /proc/self/status; "
"skipping fail-fast (cannot confirm anonymous memory pressure).",
usage_percent,
)
69 changes: 69 additions & 0 deletions unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,3 +856,72 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json(
# There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here
record_messages = list(filter(lambda message: "RECORD" in message, messages))
assert len(record_messages) == 2


def test_memory_failfast_flushes_queued_state_before_raising(mocker):
"""Record emitted → check_memory_usage raises → queued STATE flushed with recordCount → exception propagates."""
queued_state = AirbyteMessage(
type=Type.STATE,
state=AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="users", namespace=None),
stream_state=AirbyteStateBlob({"cursor": "abc123"}),
),
),
)

message_repository = MagicMock()
message_repository.consume_queue.side_effect = [
[queued_state], # flush during fail-fast exception handling
[], # normal end-of-loop flush (not reached)
]
mocker.patch.object(
MockSource,
"message_repository",
new_callable=mocker.PropertyMock,
return_value=message_repository,
)

record = AirbyteMessage(
record=AirbyteRecordMessage(stream="users", data={"id": 1}, emitted_at=1),
type=Type.RECORD,
)
mocker.patch.object(MockSource, "read_state", return_value={})
mocker.patch.object(MockSource, "read_catalog", return_value={})
mocker.patch.object(MockSource, "read", return_value=[record])

fail_fast_exc = AirbyteTracedException(
message="Memory usage exceeded critical threshold (98%)",
failure_type=FailureType.system_error,
)

entrypoint_obj = AirbyteEntrypoint(MockSource())
mocker.patch.object(
entrypoint_obj._memory_monitor, "check_memory_usage", side_effect=fail_fast_exc
)

spec = ConnectorSpecification(connectionSpecification={})
config: dict[str, str] = {}

# Call read() directly to get AirbyteMessage objects (not serialised strings)
gen = entrypoint_obj.read(spec, config, {}, [])

# 1. First yielded message is the RECORD
first = next(gen)
assert first.type == Type.RECORD
assert first.record.stream == "users" # type: ignore[union-attr]

# 2. Second yielded message is the queued STATE (flushed before exception)
second = next(gen)
assert second.type == Type.STATE
assert second.state.stream.stream_state == AirbyteStateBlob({"cursor": "abc123"}) # type: ignore[union-attr]

# 3. The STATE passed through handle_record_counts, so sourceStats.recordCount == 1.0
assert second.state.sourceStats is not None # type: ignore[union-attr]
assert second.state.sourceStats.recordCount == 1.0 # type: ignore[union-attr]

# 4. Next iteration re-raises the AirbyteTracedException
with pytest.raises(AirbyteTracedException) as exc_info:
next(gen)
assert exc_info.value is fail_fast_exc
Loading
Loading