Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions airbyte_cdk/utils/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from pathlib import Path
from typing import Optional

import sentry_sdk

logger = logging.getLogger("airbyte")

# cgroup v2 paths
Expand Down Expand Up @@ -47,6 +49,7 @@ def __init__(
self._message_count = 0
self._cgroup_version: Optional[int] = None
self._probed = False
self._sentry_alerted = False

def _probe_cgroup(self) -> None:
"""Detect which cgroup version (if any) is available.
Expand Down Expand Up @@ -132,9 +135,16 @@ def check_memory_usage(self) -> None:
limit_gb = limit_bytes / (1024**3)

if usage_ratio >= _MEMORY_THRESHOLD:
logger.warning(
"Source memory usage at %d%% of container limit (%.2f / %.2f GB).",
message = "Source memory usage at %d%% of container limit (%.2f / %.2f GB)." % (
usage_percent,
usage_gb,
limit_gb,
)
logger.warning(message)
if not self._sentry_alerted:
try:
sentry_sdk.capture_message(message, level="warning")
except Exception:
logger.debug("Failed to send high-memory warning to Sentry.", exc_info=True)
else:
self._sentry_alerted = True
66 changes: 65 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ sqlalchemy = {version = "^2.0,!=2.0.36", optional = true }
fastapi = { version = ">=0.116.1", optional = true }
uvicorn = { version = ">=0.35.0", optional = true}
ddtrace = { version = "^3", optional = true }
sentry-sdk = ">=2.0,<3.0"
xmltodict = ">=0.13,<0.15"
anyascii = "^0.3.2"
whenever = ">=0.7.3,<0.9.0"
Expand Down Expand Up @@ -235,7 +236,7 @@ exclude = [
DEP001 = [
# These are imported but not declared:
"source_declarative_manifest", # Imported only in dynamic import tests, not in main code
"google" # Imported via google.cloud.secretmanager_v1 which is provided by google-cloud-secret-manager
"google", # Imported via google.cloud.secretmanager_v1 which is provided by google-cloud-secret-manager
]

# DEP002: Project should not contain unused dependencies.
Expand Down
121 changes: 120 additions & 1 deletion unit_tests/utils/test_memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
from pathlib import Path
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest

Expand Down Expand Up @@ -245,3 +245,122 @@ def mock_read_text(self: Path) -> str:
):
monitor.check_memory_usage()
assert not caplog.records


# ---------------------------------------------------------------------------
# check_memory_usage — Sentry capture_message
# ---------------------------------------------------------------------------


def test_sentry_capture_message_called_on_high_memory() -> None:
"""sentry_sdk.capture_message() should be called once when memory exceeds 90%."""
mock_capture = MagicMock()
monitor = MemoryMonitor(check_interval=1)
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = mock_capture
monitor.check_memory_usage()

mock_capture.assert_called_once()
call_args = mock_capture.call_args
assert "91%" in call_args[0][0]
assert call_args[1]["level"] == "warning"


def test_sentry_capture_message_only_once_per_sync() -> None:
"""sentry_sdk.capture_message() should fire only once even if memory stays high."""
mock_capture = MagicMock()
monitor = MemoryMonitor(check_interval=1)
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = mock_capture
monitor.check_memory_usage()
monitor.check_memory_usage()
monitor.check_memory_usage()

mock_capture.assert_called_once()


def test_sentry_not_called_below_threshold() -> None:
"""sentry_sdk.capture_message() should not be called when memory is below 90%."""
mock_capture = MagicMock()
monitor = MemoryMonitor(check_interval=1)
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_BELOW)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = mock_capture
monitor.check_memory_usage()

mock_capture.assert_not_called()


def test_sentry_capture_message_includes_memory_details() -> None:
"""sentry_sdk.capture_message() should include memory percentage and GB values."""
mock_capture = MagicMock()
monitor = MemoryMonitor(check_interval=1)
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = mock_capture
monitor.check_memory_usage()

mock_capture.assert_called_once()
msg = mock_capture.call_args[0][0]
assert "91%" in msg
assert "0.85 / 0.93 GB" in msg


def test_sentry_failure_does_not_crash_sync(caplog: pytest.LogCaptureFixture) -> None:
"""A Sentry failure must never abort the sync — capture_message is best-effort."""
monitor = MemoryMonitor(check_interval=1)
with (
caplog.at_level(logging.DEBUG, logger="airbyte"),
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = MagicMock(side_effect=RuntimeError("Sentry transport error"))
# Must not raise — observability should never break the sync
monitor.check_memory_usage()

# The warning log should still be emitted even though Sentry failed
warning_records = [r for r in caplog.records if r.levelno == logging.WARNING]
assert len(warning_records) == 1
assert "91%" in warning_records[0].message

# The debug log should mention the Sentry failure
debug_records = [r for r in caplog.records if r.levelno == logging.DEBUG]
assert any("Failed to send high-memory warning to Sentry" in r.message for r in debug_records)

# _sentry_alerted should NOT be set, so a retry is possible on the next check
assert not monitor._sentry_alerted


def test_sentry_retries_after_transient_failure() -> None:
"""After a transient Sentry failure, the next check should retry capture_message."""
monitor = MemoryMonitor(check_interval=1)
mock_capture = MagicMock(side_effect=[RuntimeError("transient"), None])
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = mock_capture
# First call: Sentry raises, flag stays False
monitor.check_memory_usage()
assert not monitor._sentry_alerted
# Second call: Sentry succeeds, flag flips True
monitor.check_memory_usage()
assert monitor._sentry_alerted

assert mock_capture.call_count == 2
Loading