From d3056e3f4677351529ddb45f16040fff594113e2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 30 Mar 2026 16:31:26 +0000 Subject: [PATCH 1/2] chore: initialize feature branch 1 Co-Authored-By: patrick.nilan@airbyte.io From 5aac7208e016c135a68ab91931d12bee49d3aa29 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 30 Mar 2026 16:40:03 +0000 Subject: [PATCH 2/2] DO NOT MERGE: add intentional memory leak to test graceful OOM shutdown Co-Authored-By: patrick.nilan@airbyte.io --- airbyte_cdk/entrypoint.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 82136b65f..7d97fc853 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -279,8 +279,15 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) + + # DO NOT MERGE: intentional memory leak to test graceful OOM shutdown. + # Each consumed record is duplicated 500x into a growing list to inflate memory quickly. + _leaked_records: list[Any] = [] + for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) + if message.type == Type.RECORD: + _leaked_records.extend([message.record] * 500) try: self._memory_monitor.check_memory_usage() except AirbyteTracedException: