diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 0767ce3bf..d2b3d8bdd 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -8,6 +8,8 @@ from pathlib import Path from typing import Optional +import sentry_sdk + logger = logging.getLogger("airbyte") # cgroup v2 paths @@ -47,6 +49,7 @@ def __init__( self._message_count = 0 self._cgroup_version: Optional[int] = None self._probed = False + self._sentry_alerted = False def _probe_cgroup(self) -> None: """Detect which cgroup version (if any) is available. @@ -132,9 +135,16 @@ def check_memory_usage(self) -> None: limit_gb = limit_bytes / (1024**3) if usage_ratio >= _MEMORY_THRESHOLD: - logger.warning( - "Source memory usage at %d%% of container limit (%.2f / %.2f GB).", + message = "Source memory usage at %d%% of container limit (%.2f / %.2f GB)." % ( usage_percent, usage_gb, limit_gb, ) + logger.warning(message) + if not self._sentry_alerted: + try: + sentry_sdk.capture_message(message, level="warning") + except Exception: + logger.debug("Failed to send high-memory warning to Sentry.", exc_info=True) + else: + self._sentry_alerted = True diff --git a/poetry.lock b/poetry.lock index 55c2fa668..43dbebaac 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5734,6 +5734,70 @@ dev = ["cython-lint (>=0.12.2)", "doit (>=0.36.0)", "mypy (==1.10.0)", "pycodest doc = ["intersphinx_registry", "jupyterlite-pyodide-kernel", "jupyterlite-sphinx (>=0.16.5)", "jupytext", "matplotlib (>=3.5)", "myst-nb", "numpydoc", "pooch", "pydata-sphinx-theme (>=0.15.2)", "sphinx (>=5.0.0,<8.0.0)", "sphinx-copybutton", "sphinx-design (>=0.4.0)"] test = ["Cython", "array-api-strict (>=2.0,<2.1.1)", "asv", "gmpy2", "hypothesis (>=6.30)", "meson", "mpmath", "ninja", "pooch", "pytest", "pytest-cov", "pytest-timeout", "pytest-xdist", "scikit-umfpack", "threadpoolctl"] +[[package]] +name = "sentry-sdk" +version = "2.55.0" +description = "Python client for Sentry (https://sentry.io)" +optional = false +python-versions = ">=3.6" +groups = ["main"] +markers = "python_version <= \"3.11\" or python_version >= \"3.12\"" +files = [ + {file = "sentry_sdk-2.55.0-py2.py3-none-any.whl", hash = "sha256:97026981cb15699394474a196b88503a393cbc58d182ece0d3abe12b9bd978d4"}, + {file = "sentry_sdk-2.55.0.tar.gz", hash = "sha256:3774c4d8820720ca4101548131b9c162f4c9426eb7f4d24aca453012a7470f69"}, +] + +[package.dependencies] +certifi = "*" +urllib3 = ">=1.26.11" + +[package.extras] +aiohttp = ["aiohttp (>=3.5)"] +anthropic = ["anthropic (>=0.16)"] +arq = ["arq (>=0.23)"] +asyncpg = ["asyncpg (>=0.23)"] +beam = ["apache-beam (>=2.12)"] +bottle = ["bottle (>=0.12.13)"] +celery = ["celery (>=3)"] +celery-redbeat = ["celery-redbeat (>=2)"] +chalice = ["chalice (>=1.16.0)"] +clickhouse-driver = ["clickhouse-driver (>=0.2.0)"] +django = ["django (>=1.8)"] +falcon = ["falcon (>=1.4)"] +fastapi = ["fastapi (>=0.79.0)"] +flask = ["blinker (>=1.1)", "flask (>=0.11)", "markupsafe"] +google-genai = ["google-genai (>=1.29.0)"] +grpcio = ["grpcio (>=1.21.1)", "protobuf (>=3.8.0)"] +http2 = ["httpcore[http2] (==1.*)"] +httpx = ["httpx (>=0.16.0)"] +huey = ["huey (>=2)"] +huggingface-hub = ["huggingface_hub (>=0.22)"] +langchain = ["langchain (>=0.0.210)"] +langgraph = ["langgraph (>=0.6.6)"] +launchdarkly = ["launchdarkly-server-sdk (>=9.8.0)"] +litellm = ["litellm (>=1.77.5)"] +litestar = ["litestar (>=2.0.0)"] +loguru = ["loguru (>=0.5)"] +mcp = ["mcp (>=1.15.0)"] +openai = ["openai (>=1.0.0)", "tiktoken (>=0.3.0)"] +openfeature = ["openfeature-sdk (>=0.7.1)"] +opentelemetry = ["opentelemetry-distro (>=0.35b0)"] +opentelemetry-experimental = ["opentelemetry-distro"] +opentelemetry-otlp = ["opentelemetry-distro[otlp] (>=0.35b0)"] +pure-eval = ["asttokens", "executing", "pure_eval"] +pydantic-ai = ["pydantic-ai (>=1.0.0)"] +pymongo = ["pymongo (>=3.1)"] +pyspark = ["pyspark (>=2.4.4)"] +quart = ["blinker (>=1.1)", "quart (>=0.16.1)"] +rq = ["rq (>=0.6)"] +sanic = ["sanic (>=0.8)"] +sqlalchemy = ["sqlalchemy (>=1.2)"] +starlette = ["starlette (>=0.19.1)"] +starlite = ["starlite (>=1.48)"] +statsig = ["statsig (>=0.55.3)"] +tornado = ["tornado (>=6)"] +unleash = ["UnleashClient (>=6.0.1)"] + [[package]] name = "serpyco-rs" version = "1.13.0" @@ -7045,4 +7109,4 @@ vector-db-based = ["cohere", "langchain_community", "langchain_core", "langchain [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.14" -content-hash = "b785d39f246498c8facd7854999dbdbfb78808489a09922dd3a1551be331ea7d" +content-hash = "3663fab9b73663a761ef32b74bed5b78685a116eed9cdc5baa7ec5faf63a76fd" diff --git a/pyproject.toml b/pyproject.toml index bcdab217b..c11c8a51a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,6 +89,7 @@ sqlalchemy = {version = "^2.0,!=2.0.36", optional = true } fastapi = { version = ">=0.116.1", optional = true } uvicorn = { version = ">=0.35.0", optional = true} ddtrace = { version = "^3", optional = true } +sentry-sdk = ">=2.0,<3.0" xmltodict = ">=0.13,<0.15" anyascii = "^0.3.2" whenever = ">=0.7.3,<0.9.0" @@ -235,7 +236,7 @@ exclude = [ DEP001 = [ # These are imported but not declared: "source_declarative_manifest", # Imported only in dynamic import tests, not in main code - "google" # Imported via google.cloud.secretmanager_v1 which is provided by google-cloud-secret-manager + "google", # Imported via google.cloud.secretmanager_v1 which is provided by google-cloud-secret-manager ] # DEP002: Project should not contain unused dependencies. diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index cf8250465..538e87687 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -4,7 +4,7 @@ import logging from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -245,3 +245,122 @@ def mock_read_text(self: Path) -> str: ): monitor.check_memory_usage() assert not caplog.records + + +# --------------------------------------------------------------------------- +# check_memory_usage — Sentry capture_message +# --------------------------------------------------------------------------- + + +def test_sentry_capture_message_called_on_high_memory() -> None: + """sentry_sdk.capture_message() should be called once when memory exceeds 90%.""" + mock_capture = MagicMock() + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry, + ): + mock_sentry.capture_message = mock_capture + monitor.check_memory_usage() + + mock_capture.assert_called_once() + call_args = mock_capture.call_args + assert "91%" in call_args[0][0] + assert call_args[1]["level"] == "warning" + + +def test_sentry_capture_message_only_once_per_sync() -> None: + """sentry_sdk.capture_message() should fire only once even if memory stays high.""" + mock_capture = MagicMock() + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry, + ): + mock_sentry.capture_message = mock_capture + monitor.check_memory_usage() + monitor.check_memory_usage() + monitor.check_memory_usage() + + mock_capture.assert_called_once() + + +def test_sentry_not_called_below_threshold() -> None: + """sentry_sdk.capture_message() should not be called when memory is below 90%.""" + mock_capture = MagicMock() + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_BELOW)), + patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry, + ): + mock_sentry.capture_message = mock_capture + monitor.check_memory_usage() + + mock_capture.assert_not_called() + + +def test_sentry_capture_message_includes_memory_details() -> None: + """sentry_sdk.capture_message() should include memory percentage and GB values.""" + mock_capture = MagicMock() + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry, + ): + mock_sentry.capture_message = mock_capture + monitor.check_memory_usage() + + mock_capture.assert_called_once() + msg = mock_capture.call_args[0][0] + assert "91%" in msg + assert "0.85 / 0.93 GB" in msg + + +def test_sentry_failure_does_not_crash_sync(caplog: pytest.LogCaptureFixture) -> None: + """A Sentry failure must never abort the sync — capture_message is best-effort.""" + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.DEBUG, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry, + ): + mock_sentry.capture_message = MagicMock(side_effect=RuntimeError("Sentry transport error")) + # Must not raise — observability should never break the sync + monitor.check_memory_usage() + + # The warning log should still be emitted even though Sentry failed + warning_records = [r for r in caplog.records if r.levelno == logging.WARNING] + assert len(warning_records) == 1 + assert "91%" in warning_records[0].message + + # The debug log should mention the Sentry failure + debug_records = [r for r in caplog.records if r.levelno == logging.DEBUG] + assert any("Failed to send high-memory warning to Sentry" in r.message for r in debug_records) + + # _sentry_alerted should NOT be set, so a retry is possible on the next check + assert not monitor._sentry_alerted + + +def test_sentry_retries_after_transient_failure() -> None: + """After a transient Sentry failure, the next check should retry capture_message.""" + monitor = MemoryMonitor(check_interval=1) + mock_capture = MagicMock(side_effect=[RuntimeError("transient"), None]) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch("airbyte_cdk.utils.memory_monitor.sentry_sdk") as mock_sentry, + ): + mock_sentry.capture_message = mock_capture + # First call: Sentry raises, flag stays False + monitor.check_memory_usage() + assert not monitor._sentry_alerted + # Second call: Sentry succeeds, flag flips True + monitor.check_memory_usage() + assert monitor._sentry_alerted + + assert mock_capture.call_count == 2