From 0dae8a35d2dd12d316747bd860c9d9527cce3d06 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 30 Mar 2026 16:31:56 +0000 Subject: [PATCH 1/3] chore: initialize feature branch 2 Co-Authored-By: patrick.nilan@airbyte.io From 5a297580d09979bcf4380664e1446b93bd678945 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 30 Mar 2026 16:40:54 +0000 Subject: [PATCH 2/3] DO NOT MERGE: inflate cgroup memory via file-backed mmap to test dual-condition monitor Co-Authored-By: patrick.nilan@airbyte.io --- airbyte_cdk/entrypoint.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 82136b65f..d9e15e1d2 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -7,6 +7,8 @@ import ipaddress import json import logging +import mmap +import os import os.path import socket import sys @@ -279,8 +281,27 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) + + # DO NOT MERGE: inflate cgroup memory via file-backed shared mmap pages. + # This grows container memory usage (cgroup) without increasing the Python + # process's anonymous RSS (RssAnon), testing that the dual-condition + # memory monitor does NOT false-positive on non-process memory pressure. + _cache_mmaps: list[mmap.mmap] = [] + _cache_fds: list[int] = [] + _CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB per record message + for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) + if message.type == Type.RECORD: + fd = os.open( + tempfile.mktemp(prefix="cache_pressure_", dir="/dev/shm"), + os.O_RDWR | os.O_CREAT, + ) + os.ftruncate(fd, _CHUNK_SIZE) + mm = mmap.mmap(fd, _CHUNK_SIZE, access=mmap.ACCESS_WRITE) + mm.write(b"\x00" * _CHUNK_SIZE) # fault the pages into resident memory + _cache_mmaps.append(mm) + _cache_fds.append(fd) try: self._memory_monitor.check_memory_usage() except AirbyteTracedException: From 497050564325d86b9faf7b02f59028f50f0e58ce Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:51:46 +0000 Subject: [PATCH 3/3] fix: use /tmp instead of /dev/shm for cache pressure mmap to avoid SIGBUS /dev/shm is a tmpfs with a hard size limit (typically 64 MB in Docker containers). After ~6 records at 10 MB each, the filesystem fills up and the next mm.write() triggers SIGBUS (exit code 135). Switching to /tmp (overlay fs) avoids the size limit. File-backed mmap pages on overlay fs still count toward cgroup memory and go into RssFile (not RssAnon), preserving the dual-condition test. Co-Authored-By: patrick.nilan@airbyte.io --- airbyte_cdk/entrypoint.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d9e15e1d2..cf8faf41f 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -286,6 +286,11 @@ def read( # This grows container memory usage (cgroup) without increasing the Python # process's anonymous RSS (RssAnon), testing that the dual-condition # memory monitor does NOT false-positive on non-process memory pressure. + # + # Uses /tmp (overlay fs) instead of /dev/shm (tmpfs) to avoid SIGBUS + # when /dev/shm fills up (typically 64 MB limit in Docker containers). + # File-backed mmap pages on overlay fs still count toward cgroup memory + # and go into RssFile (not RssAnon), preserving the dual-condition test. _cache_mmaps: list[mmap.mmap] = [] _cache_fds: list[int] = [] _CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB per record message @@ -294,7 +299,7 @@ def read( yield self.handle_record_counts(message, stream_message_counter) if message.type == Type.RECORD: fd = os.open( - tempfile.mktemp(prefix="cache_pressure_", dir="/dev/shm"), + tempfile.mktemp(prefix="cache_pressure_", dir="/tmp"), os.O_RDWR | os.O_CREAT, ) os.ftruncate(fd, _CHUNK_SIZE)