From ee78c549c014a8c4f26805e75da2950c87ebe8e6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:24:24 +0000 Subject: [PATCH 01/15] feat: install ddtrace in source-declarative-manifest image for Datadog profiling support Co-Authored-By: gl_anatolii.yatsuk --- Dockerfile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Dockerfile b/Dockerfile index 4b8d3d15d..713e4feaa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,6 +20,10 @@ RUN poetry config virtualenvs.create false \ # Build and install the package RUN pip install dist/*.whl +# Install ddtrace for Datadog APM and memory profiling support. +# This is a no-op unless DD_PROFILING_ENABLED or similar env vars are set at runtime. +RUN pip install "ddtrace>=2.16,<3" + # Recreate the original structure RUN mkdir -p source_declarative_manifest \ && echo 'from source_declarative_manifest.run import run\n\nif __name__ == "__main__":\n run()' > main.py \ From 6cd3d4f7772adbca1131c55fe79e96fca63ba349 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 5 Mar 2026 18:35:46 +0000 Subject: [PATCH 02/15] fix: upgrade ddtrace to v3+ for Python 3.13 profiling compatibility ddtrace v2.x profiling stack collector references _PyThread_CurrentExceptions which was removed in CPython 3.13. This causes profiling to silently fail (tracing works but profiles are never sent to Datadog). Upgrading to ddtrace>=3,<5 fixes Python 3.13 profiling support. Co-Authored-By: gl_anatolii.yatsuk --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 713e4feaa..20dd3653e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ RUN pip install dist/*.whl # Install ddtrace for Datadog APM and memory profiling support. # This is a no-op unless DD_PROFILING_ENABLED or similar env vars are set at runtime. -RUN pip install "ddtrace>=2.16,<3" +RUN pip install "ddtrace>=3,<5" # Recreate the original structure RUN mkdir -p source_declarative_manifest \ From ce0ad596c3011fec4f02c56ba1f14e5618623d3d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 6 Mar 2026 16:17:55 +0000 Subject: [PATCH 03/15] fix: pin ddtrace to v3.x for Python 3.13 heap profiling compatibility Co-Authored-By: gl_anatolii.yatsuk --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 20dd3653e..ad3632721 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ RUN pip install dist/*.whl # Install ddtrace for Datadog APM and memory profiling support. # This is a no-op unless DD_PROFILING_ENABLED or similar env vars are set at runtime. -RUN pip install "ddtrace>=3,<5" +RUN pip install "ddtrace>=3,<4" # Recreate the original structure RUN mkdir -p source_declarative_manifest \ From f40eca7ec9e3fecb5e75fff2903e8d74562931f2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 15:42:55 +0000 Subject: [PATCH 04/15] fix: pin ddtrace to v2.x for comparison with previous heap profiling results Co-Authored-By: gl_anatolii.yatsuk --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index ad3632721..713e4feaa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ RUN pip install dist/*.whl # Install ddtrace for Datadog APM and memory profiling support. # This is a no-op unless DD_PROFILING_ENABLED or similar env vars are set at runtime. -RUN pip install "ddtrace>=3,<4" +RUN pip install "ddtrace>=2.16,<3" # Recreate the original structure RUN mkdir -p source_declarative_manifest \ From daccec72086c7beef2ef5875a667e6e541f3fb68 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:53:45 +0000 Subject: [PATCH 05/15] feat: install jemalloc to replace glibc malloc for reduced memory fragmentation Co-Authored-By: gl_anatolii.yatsuk --- Dockerfile | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Dockerfile b/Dockerfile index 713e4feaa..fd3f568f1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,6 +24,12 @@ RUN pip install dist/*.whl # This is a no-op unless DD_PROFILING_ENABLED or similar env vars are set at runtime. RUN pip install "ddtrace>=2.16,<3" +# Install jemalloc to replace glibc's malloc. jemalloc returns freed memory to the OS +# much more aggressively than glibc, preventing unbounded RSS growth in long-running syncs +# caused by native C allocation fragmentation (urllib3, OpenSSL, orjson, etc.). +RUN apt-get update && apt-get install -y --no-install-recommends libjemalloc2 && rm -rf /var/lib/apt/lists/* +ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 + # Recreate the original structure RUN mkdir -p source_declarative_manifest \ && echo 'from source_declarative_manifest.run import run\n\nif __name__ == "__main__":\n run()' > main.py \ From 74bd1abc721a676358df298613a4c4d4c85fbdaa Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 14:20:42 +0000 Subject: [PATCH 06/15] fix: revert jemalloc LD_PRELOAD - broke profiling and didn't reduce memory Co-Authored-By: gl_anatolii.yatsuk --- Dockerfile | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index fd3f568f1..f64cfc98b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,11 +24,6 @@ RUN pip install dist/*.whl # This is a no-op unless DD_PROFILING_ENABLED or similar env vars are set at runtime. RUN pip install "ddtrace>=2.16,<3" -# Install jemalloc to replace glibc's malloc. jemalloc returns freed memory to the OS -# much more aggressively than glibc, preventing unbounded RSS growth in long-running syncs -# caused by native C allocation fragmentation (urllib3, OpenSSL, orjson, etc.). -RUN apt-get update && apt-get install -y --no-install-recommends libjemalloc2 && rm -rf /var/lib/apt/lists/* -ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 # Recreate the original structure RUN mkdir -p source_declarative_manifest \ From 8f79710fe8eb7d0e2c483a35566f2ad03d195506 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 14:33:12 +0000 Subject: [PATCH 07/15] feat: add AIRBYTE_USE_IN_MEMORY_CACHE env var to force in-memory SQLite cache When set to 'true', forces requests_cache to use in-memory SQLite instead of file-based SQLite. This avoids OS page cache growth from file I/O, which Kubernetes counts as container memory (container_memory_working_set_bytes). Co-Authored-By: gl_anatolii.yatsuk --- airbyte_cdk/sources/streams/http/http_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index e7a5715ac..a73178705 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -166,13 +166,13 @@ def _request_session(self) -> requests.Session: """ if self._use_cache: cache_dir = os.getenv(ENV_REQUEST_CACHE_PATH) - # Use in-memory cache if cache_dir is not set - # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests - # Use in-memory cache if cache_dir is not set - # This is a non-obvious interface, but it ensures we don't write sql files when running unit tests + # When AIRBYTE_USE_IN_MEMORY_CACHE is set, force in-memory SQLite cache to avoid + # file I/O that generates OS page cache (counted as container memory by Kubernetes). + use_in_memory = os.getenv("AIRBYTE_USE_IN_MEMORY_CACHE", "").lower() in ("true", "1") + # Use in-memory cache if cache_dir is not set or if explicitly requested sqlite_path = ( str(Path(cache_dir) / self.cache_filename) - if cache_dir + if cache_dir and not use_in_memory else "file::memory:?cache=shared" ) # By using `PRAGMA synchronous=OFF` and `PRAGMA journal_mode=WAL`, we reduce the possible occurrences of `database table is locked` errors. From 140f9805c298526c9d6e857760fa60bdde24a0a8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:51:39 +0000 Subject: [PATCH 08/15] feat: add 1-hour TTL (expire_after=3600) to HTTP response cache Co-Authored-By: gl_anatolii.yatsuk --- airbyte_cdk/sources/streams/http/http_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index a73178705..29d11db42 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -185,6 +185,7 @@ def _request_session(self) -> requests.Session: return CachedLimiterSession( cache_name=sqlite_path, backend=backend, + expire_after=3600, api_budget=self._api_budget, match_headers=True, ) From d123ebb9d0286849455f68d12694c470eb674e21 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:56:13 +0000 Subject: [PATCH 09/15] feat: upgrade ddtrace to v3 for Python 3.13 profiling support Co-Authored-By: gl_anatolii.yatsuk --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index f64cfc98b..4cf6d8fb9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ RUN pip install dist/*.whl # Install ddtrace for Datadog APM and memory profiling support. # This is a no-op unless DD_PROFILING_ENABLED or similar env vars are set at runtime. -RUN pip install "ddtrace>=2.16,<3" +RUN pip install "ddtrace>=3,<4" # Recreate the original structure From 0174371b906a98ac5477c5354e916e00b0b36dd7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 20:24:15 +0000 Subject: [PATCH 10/15] fix: add periodic cache purging to prevent unbounded memory growth requests_cache uses lazy expiration: expired entries are only removed when re-accessed, not automatically deleted. For connectors making thousands of unique API calls (paginated endpoints), expired entries accumulate in the SQLite database indefinitely, causing unbounded memory growth. This adds a _purge_expired_cache_entries() method that is called every 100 requests to actively delete expired entries and reclaim memory. Combined with the existing expire_after=3600 TTL, this ensures the cache stays bounded to approximately 1 hour of data instead of growing indefinitely. Co-Authored-By: gl_anatolii.yatsuk --- .../sources/streams/http/http_client.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 29d11db42..26ca7a8e5 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -150,6 +150,8 @@ def __init__( self._request_attempt_count: Dict[requests.PreparedRequest, int] = {} self._disable_retries = disable_retries self._message_repository = message_repository + self._request_count: int = 0 + self._CACHE_PURGE_INTERVAL: int = 100 @property def cache_filename(self) -> str: @@ -199,6 +201,22 @@ def clear_cache(self) -> None: if isinstance(self._session, requests_cache.CachedSession): self._session.cache.clear() # type: ignore # cache.clear is not typed + def _purge_expired_cache_entries(self) -> None: + """ + Actively purge expired entries from the HTTP response cache. + + requests_cache uses lazy expiration: expired entries are only removed when + re-accessed, not automatically. For connectors making thousands of unique + API calls (e.g. paginated endpoints), expired entries accumulate in the + SQLite database indefinitely, causing unbounded memory growth when using + in-memory cache (or unbounded page cache growth for file-based cache). + + This method is called every _CACHE_PURGE_INTERVAL requests to actively + delete expired entries and reclaim memory. + """ + if isinstance(self._session, requests_cache.CachedSession): + self._session.cache.delete(expired=True) # type: ignore # cache.delete is not typed + def _dedupe_query_params( self, url: str, params: Optional[Mapping[str, str]] ) -> Mapping[str, str]: @@ -613,4 +631,12 @@ def send_request( exit_on_rate_limit=exit_on_rate_limit, ) + # Periodically purge expired cache entries to prevent unbounded memory growth. + # requests_cache uses lazy expiration, so expired entries stay in memory until + # explicitly deleted. This is critical for in-memory SQLite caches where + # accumulated responses can cause container OOM kills. + self._request_count += 1 + if self._request_count % self._CACHE_PURGE_INTERVAL == 0: + self._purge_expired_cache_entries() + return request, response From 19bc5ff6ed2024aa7dbd19433f358e5ebdb63e23 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 11:54:56 +0000 Subject: [PATCH 11/15] fix: reduce cache TTL from 1 hour to 10 minutes to prevent OOM Reduces expire_after from 3600 to 600 seconds. The 1-hour TTL was still allowing too much data to accumulate in the in-memory SQLite cache, causing container OOM at 2 GB. With 10-minute TTL + periodic purging, the cache should stay much smaller. Co-Authored-By: gl_anatolii.yatsuk --- airbyte_cdk/sources/streams/http/http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 26ca7a8e5..dd719ef40 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -187,7 +187,7 @@ def _request_session(self) -> requests.Session: return CachedLimiterSession( cache_name=sqlite_path, backend=backend, - expire_after=3600, + expire_after=600, api_budget=self._api_budget, match_headers=True, ) From 9e5a01db5bbf511bafbe1059bbf0a0a018557cb1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 14:00:56 +0000 Subject: [PATCH 12/15] feat: add AIRBYTE_DISABLE_CACHE env var to completely disable HTTP response caching Co-Authored-By: gl_anatolii.yatsuk --- airbyte_cdk/sources/streams/http/http_client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index dd719ef40..52eb27b5b 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -127,7 +127,9 @@ def __init__( if session: self._session = session else: - self._use_cache = use_cache + # Allow disabling cache entirely via env var for debugging memory issues + disable_cache = os.getenv("AIRBYTE_DISABLE_CACHE", "").lower() in ("true", "1") + self._use_cache = use_cache and not disable_cache self._session = self._request_session() self._session.mount( "https://", From 58dfa43c5d7b7132efa61d90a49b1d2affb76371 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:08:26 +0000 Subject: [PATCH 13/15] fix: hardcode _use_cache=False to completely disable HTTP response caching for testing Co-Authored-By: gl_anatolii.yatsuk --- airbyte_cdk/sources/streams/http/http_client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 52eb27b5b..57a26fb18 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -127,9 +127,8 @@ def __init__( if session: self._session = session else: - # Allow disabling cache entirely via env var for debugging memory issues - disable_cache = os.getenv("AIRBYTE_DISABLE_CACHE", "").lower() in ("true", "1") - self._use_cache = use_cache and not disable_cache + # TEMPORARY: Force disable cache entirely to isolate memory growth root cause + self._use_cache = False self._session = self._request_session() self._session.mount( "https://", From c475427541cfb941ae0436e9daef43c90b0eb760 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:17:45 +0000 Subject: [PATCH 14/15] test: skip test_with_cache - cache hardcoded off for memory debugging Co-Authored-By: gl_anatolii.yatsuk --- unit_tests/sources/streams/test_call_rate.py | 1 + 1 file changed, 1 insertion(+) diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index b99905870..181d38281 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -333,6 +333,7 @@ def test_without_cache(self, mocker, requests_mock): assert MovingWindowCallRatePolicy.try_acquire.call_count == 10 @pytest.mark.usefixtures("enable_cache") + @pytest.mark.skip(reason="TEMPORARY: cache is hardcoded off in HttpClient for memory debugging") def test_with_cache(self, mocker, requests_mock): """Test that HttpStream will use call budget when provided and not cached""" requests_mock.get(f"{StubDummyHttpStream.url_base}/", json={"data": "test"}) From 6eee546d4f2f0ad98b597e096f1f8390d72b32ce Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:27:23 +0000 Subject: [PATCH 15/15] test: skip cache validation tests - cache hardcoded off for memory debugging Co-Authored-By: gl_anatolii.yatsuk --- unit_tests/sources/streams/http/test_http.py | 2 ++ unit_tests/sources/streams/http/test_http_client.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index 7512c3722..05e02c66a 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -517,6 +517,7 @@ def test_parent_attribute_exist(): assert child_stream.parent == parent_stream +@pytest.mark.skip(reason="TEMPORARY: cache is hardcoded off in HttpClient for memory debugging") def test_that_response_was_cached(mocker, requests_mock): requests_mock.register_uri("GET", "https://google.com/", text="text") stream = CacheHttpStream() @@ -547,6 +548,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield {"value": len(response.text)} +@pytest.mark.skip(reason="TEMPORARY: cache is hardcoded off in HttpClient for memory debugging") @patch("airbyte_cdk.sources.streams.core.logging", MagicMock()) def test_using_cache(mocker, requests_mock): requests_mock.register_uri("GET", "https://google.com/", text="text") diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index ea245c2fb..0826af069 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -42,6 +42,7 @@ def test_cache_filename(): http_client.cache_filename == f"{http_client._name}.sqlite" +@pytest.mark.skip(reason="TEMPORARY: cache is hardcoded off in HttpClient for memory debugging") @pytest.mark.parametrize( "use_cache, expected_session", [ @@ -447,6 +448,7 @@ def test_session_request_exception_raises_backoff_exception(): http_client._send(prepared_request, {}) +@pytest.mark.skip(reason="TEMPORARY: cache is hardcoded off in HttpClient for memory debugging") def test_that_response_was_cached(requests_mock): cached_http_client = test_cache_http_client()