diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 82136b65f..7d97fc853 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -279,8 +279,15 @@ def read( # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) + + # DO NOT MERGE: intentional memory leak to test graceful OOM shutdown. + # Each consumed record is duplicated 500x into a growing list to inflate memory quickly. + _leaked_records: list[Any] = [] + for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) + if message.type == Type.RECORD: + _leaked_records.extend([message.record] * 500) try: self._memory_monitor.check_memory_usage() except AirbyteTracedException: