diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 82136b65f..d9e15e1d2 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -7,6 +7,8 @@ import ipaddress import json import logging +import mmap +import os import os.path import socket import sys @@ -279,8 +281,27 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) + + # DO NOT MERGE: inflate cgroup memory via file-backed shared mmap pages. + # This grows container memory usage (cgroup) without increasing the Python + # process's anonymous RSS (RssAnon), testing that the dual-condition + # memory monitor does NOT false-positive on non-process memory pressure. + _cache_mmaps: list[mmap.mmap] = [] + _cache_fds: list[int] = [] + _CHUNK_SIZE = 10 * 1024 * 1024 # 10 MB per record message + for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) + if message.type == Type.RECORD: + fd = os.open( + tempfile.mktemp(prefix="cache_pressure_", dir="/dev/shm"), + os.O_RDWR | os.O_CREAT, + ) + os.ftruncate(fd, _CHUNK_SIZE) + mm = mmap.mmap(fd, _CHUNK_SIZE, access=mmap.ACCESS_WRITE) + mm.write(b"\x00" * _CHUNK_SIZE) # fault the pages into resident memory + _cache_mmaps.append(mm) + _cache_fds.append(fd) try: self._memory_monitor.check_memory_usage() except AirbyteTracedException: