Skip to content
13 changes: 13 additions & 0 deletions airbyte_cdk/utils/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from pathlib import Path
from typing import Optional

try:
import sentry_sdk
except ImportError:
sentry_sdk = None # type: ignore[assignment]
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The soft-import fallback uses sentry_sdk = None # type: ignore[assignment], which suppresses type checking for this symbol. To keep mypy coverage stronger, consider explicitly typing the variable (e.g., sentry_sdk: object | None = None / Optional[Any]) and using # type: ignore[import-not-found] (or a TYPE_CHECKING import) instead of ignoring the assignment.

Suggested change
from typing import Optional
try:
import sentry_sdk
except ImportError:
sentry_sdk = None # type: ignore[assignment]
from typing import Any, Optional
sentry_sdk: Optional[Any]
try:
import sentry_sdk as _sentry_sdk # type: ignore[import-not-found]
sentry_sdk = _sentry_sdk
except ImportError:
sentry_sdk = None

Copilot uses AI. Check for mistakes.

logger = logging.getLogger("airbyte")

# cgroup v2 paths
Expand Down Expand Up @@ -47,6 +52,7 @@ def __init__(
self._message_count = 0
self._cgroup_version: Optional[int] = None
self._probed = False
self._sentry_alerted = False

def _probe_cgroup(self) -> None:
"""Detect which cgroup version (if any) is available.
Expand Down Expand Up @@ -138,3 +144,10 @@ def check_memory_usage(self) -> None:
usage_gb,
limit_gb,
)
if not self._sentry_alerted and sentry_sdk is not None:
self._sentry_alerted = True
sentry_sdk.capture_message(
"Source memory usage at %d%% of container limit (%.2f / %.2f GB)."
% (usage_percent, usage_gb, limit_gb),
level="warning",
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning message format string is duplicated: once in logger.warning(...) and again in the capture_message(...) call (with manual % formatting). To avoid the two drifting over time (and breaking Sentry fingerprinting expectations), build the message once (e.g., message = ...) and reuse it for both logging and Sentry capture.

Copilot uses AI. Check for mistakes.
)
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_sentry_alerted is set to True before calling sentry_sdk.capture_message(). If capture_message() raises (misconfiguration, transport error, unexpected SDK behavior), this will both crash the sync and permanently suppress future alerts for this monitor instance. Consider wrapping the Sentry call in a broad try/except (and logging at debug) and only setting _sentry_alerted = True after a successful capture.

Suggested change
self._sentry_alerted = True
sentry_sdk.capture_message(
"Source memory usage at %d%% of container limit (%.2f / %.2f GB)."
% (usage_percent, usage_gb, limit_gb),
level="warning",
)
try:
sentry_sdk.capture_message(
"Source memory usage at %d%% of container limit (%.2f / %.2f GB)."
% (usage_percent, usage_gb, limit_gb),
level="warning",
)
self._sentry_alerted = True
except Exception as exc:
logger.debug(
"Failed to send memory usage alert to Sentry: %s",
exc,
exc_info=True,
)

Copilot uses AI. Check for mistakes.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ exclude = [
DEP001 = [
# These are imported but not declared:
"source_declarative_manifest", # Imported only in dynamic import tests, not in main code
"google" # Imported via google.cloud.secretmanager_v1 which is provided by google-cloud-secret-manager
"google", # Imported via google.cloud.secretmanager_v1 which is provided by google-cloud-secret-manager
"sentry_sdk", # Soft-imported in memory_monitor.py; provided at runtime by connector-execution-service
]

# DEP002: Project should not contain unused dependencies.
Expand Down
73 changes: 72 additions & 1 deletion unit_tests/utils/test_memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
from pathlib import Path
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest

Expand Down Expand Up @@ -245,3 +245,74 @@ def mock_read_text(self: Path) -> str:
):
monitor.check_memory_usage()
assert not caplog.records


# ---------------------------------------------------------------------------
# check_memory_usage — Sentry capture_message
# ---------------------------------------------------------------------------


def test_sentry_capture_message_called_on_high_memory() -> None:
"""sentry_sdk.capture_message() should be called once when memory exceeds 90%."""
mock_capture = MagicMock()
monitor = MemoryMonitor(check_interval=1)
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = mock_capture
monitor.check_memory_usage()

mock_capture.assert_called_once()
call_args = mock_capture.call_args
assert "91%" in call_args[0][0]
assert call_args[1]["level"] == "warning"


def test_sentry_capture_message_only_once_per_sync() -> None:
"""sentry_sdk.capture_message() should fire only once even if memory stays high."""
mock_capture = MagicMock()
monitor = MemoryMonitor(check_interval=1)
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = mock_capture
monitor.check_memory_usage()
monitor.check_memory_usage()
monitor.check_memory_usage()

mock_capture.assert_called_once()


def test_sentry_not_called_below_threshold() -> None:
"""sentry_sdk.capture_message() should not be called when memory is below 90%."""
mock_capture = MagicMock()
monitor = MemoryMonitor(check_interval=1)
with (
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_BELOW)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry,
):
mock_sentry.capture_message = mock_capture
monitor.check_memory_usage()

mock_capture.assert_not_called()


def test_sentry_unavailable_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None:
"""When sentry_sdk is None (not installed), warning log should still be emitted."""
monitor = MemoryMonitor(check_interval=1)
with (
caplog.at_level(logging.WARNING, logger="airbyte"),
patch.object(Path, "exists", _v2_exists),
patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)),
patch("airbyte_cdk.utils.memory_monitor.sentry_sdk", None),
):
monitor.check_memory_usage()

# Warning log should still be emitted even when sentry_sdk is unavailable
assert len(caplog.records) == 1
assert "91%" in caplog.records[0].message
Loading