From ef5d8f687afc30fca0a48d01d0645f010e01a283 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:43:01 +0000 Subject: [PATCH 01/12] feat: non-blocking stdout writes to prevent deadlock on pipe backpressure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the Airbyte platform pauses reading from the source container's stdout pipe, the main thread's print() call blocks in an OS-level write() syscall. This stalls the record queue consumer, filling the bounded queue and blocking all worker threads — a complete deadlock. This change replaces blocking print() with non-blocking os.write() using select() to wait for the pipe to become writable. The main thread stays in a Python-level retry loop instead of getting stuck in a kernel syscall. When the platform resumes reading, select() returns, the write completes, and the pipeline resumes automatically. Key properties: - Memory stays bounded (queue maxsize=10,000 unchanged) - No deadlock (main thread never stuck in blocking syscall) - Automatic recovery when platform resumes reading - 600s watchdog raises RuntimeError if pipe stays blocked Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 77 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..45e4c542e 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -7,10 +7,13 @@ import ipaddress import json import logging +import os import os.path +import select import socket import sys import tempfile +import time from collections import defaultdict from functools import wraps from typing import Any, DefaultDict, Iterable, List, Mapping, Optional @@ -374,13 +377,77 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) - # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs - # Refer to: https://github.com/airbytehq/oncall/issues/6235 with PRINT_BUFFER: - for message in source_entrypoint.run(parsed_args): - # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and - # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time + _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) + + +def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: + """Write messages to stdout using non-blocking I/O to prevent deadlocks. + + When the Airbyte platform pauses reading from the source container's + stdout pipe, a blocking ``write()`` stalls the main thread. Since the + main thread is also responsible for draining the internal record queue, + this causes a cascading deadlock: the queue fills, worker threads block + on ``queue.put()``, and the entire process hangs. + + This function sets stdout to non-blocking mode so that ``os.write()`` + raises ``BlockingIOError`` instead of blocking when the pipe buffer is + full. It then uses ``select()`` to wait (with a timeout) until the fd + is writable again. The main thread remains in a Python-level retry + loop it controls, so it never gets stuck in a kernel-level syscall. + + Memory stays bounded because the upstream record queue keeps its + default bounded size (10,000 items). When stdout is blocked the main + thread pauses here, the queue fills naturally, and worker threads + block on ``queue.put()`` with their own timeouts. When the platform + resumes reading, ``select()`` returns, the write completes, the main + thread resumes draining the queue, and workers unblock automatically. + """ + stdout_fd = sys.stdout.fileno() + original_blocking = os.get_blocking(stdout_fd) + + try: + os.set_blocking(stdout_fd, False) + except OSError: + # Fallback: if we cannot set non-blocking (e.g. redirected to + # a file or in a test environment), just write normally. + for message in messages: print(f"{message}\n", end="") + return + + try: + for message in messages: + data = f"{message}\n".encode() + _write_all_nonblocking(stdout_fd, data) + finally: + try: + os.set_blocking(stdout_fd, original_blocking) + except OSError: + pass + + +def _write_all_nonblocking(fd: int, data: bytes) -> None: + """Write all bytes to a non-blocking fd, retrying with select on EAGAIN.""" + total_written = 0 + last_progress = time.monotonic() + + while total_written < len(data): + try: + written = os.write(fd, data[total_written:]) + total_written += written + last_progress = time.monotonic() + except BlockingIOError: + # Pipe buffer is full. Wait up to 1 second for it to become + # writable, then retry. The short timeout keeps the main + # thread responsive and allows periodic stall detection. + _, writable, _ = select.select([], [fd], [], 1.0) + if not writable: + elapsed = time.monotonic() - last_progress + if elapsed > 600: + raise RuntimeError( + f"stdout pipe blocked for {elapsed:.0f}s with no progress. " + "The platform is not reading from the source container pipe." + ) def _init_internal_request_filter() -> None: From 0e3212402ad3685e93507cf829531d5875facd9d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:49:10 +0000 Subject: [PATCH 02/12] fix: handle UnsupportedOperation from fileno() in test environments and log restore failures Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 45e4c542e..8e5972425 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -403,14 +403,14 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: resumes reading, ``select()`` returns, the write completes, the main thread resumes draining the queue, and workers unblock automatically. """ - stdout_fd = sys.stdout.fileno() - original_blocking = os.get_blocking(stdout_fd) - try: + stdout_fd = sys.stdout.fileno() + original_blocking = os.get_blocking(stdout_fd) os.set_blocking(stdout_fd, False) except OSError: - # Fallback: if we cannot set non-blocking (e.g. redirected to - # a file or in a test environment), just write normally. + # Fallback: if we cannot set non-blocking (e.g. pytest captures + # stdout with a StringIO that has no fileno, or the fd does not + # support non-blocking mode), just write normally. for message in messages: print(f"{message}\n", end="") return @@ -423,7 +423,7 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: try: os.set_blocking(stdout_fd, original_blocking) except OSError: - pass + logger.debug("Failed to restore stdout blocking mode", exc_info=True) def _write_all_nonblocking(fd: int, data: bytes) -> None: From e676457fdd0051742478d7c58a72741ce6798e07 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:06:06 +0000 Subject: [PATCH 03/12] fix: use sys.__stdout__ to get real fd when PRINT_BUFFER replaces sys.stdout Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 8e5972425..ad8300a67 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -403,13 +403,21 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: resumes reading, ``select()`` returns, the write completes, the main thread resumes draining the queue, and workers unblock automatically. """ + # Use the *real* stdout (sys.__stdout__) rather than sys.stdout, + # because PRINT_BUFFER replaces sys.stdout with a StringIO wrapper + # that has no fileno(). + real_stdout = sys.__stdout__ + if real_stdout is None or not hasattr(real_stdout, "fileno"): + for message in messages: + print(f"{message}\n", end="") + return + try: - stdout_fd = sys.stdout.fileno() + stdout_fd = real_stdout.fileno() original_blocking = os.get_blocking(stdout_fd) os.set_blocking(stdout_fd, False) except OSError: - # Fallback: if we cannot set non-blocking (e.g. pytest captures - # stdout with a StringIO that has no fileno, or the fd does not + # Fallback: if we cannot set non-blocking (e.g. the fd does not # support non-blocking mode), just write normally. for message in messages: print(f"{message}\n", end="") From 1b95a875e46843315eb06af4f3089404f273b587 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:12:10 +0000 Subject: [PATCH 04/12] fix: fall back to print() when sys.stdout is replaced (capsys/PRINT_BUFFER) Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index ad8300a67..54ddc4345 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -403,11 +403,12 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: resumes reading, ``select()`` returns, the write completes, the main thread resumes draining the queue, and workers unblock automatically. """ - # Use the *real* stdout (sys.__stdout__) rather than sys.stdout, - # because PRINT_BUFFER replaces sys.stdout with a StringIO wrapper - # that has no fileno(). + # Only use non-blocking I/O when stdout is the real file descriptor. + # In test environments (pytest capsys) or when PRINT_BUFFER is active, + # sys.stdout is replaced with a wrapper. Writing to sys.__stdout__ + # via os.write() would bypass the capture, so fall back to print(). real_stdout = sys.__stdout__ - if real_stdout is None or not hasattr(real_stdout, "fileno"): + if real_stdout is None or not hasattr(real_stdout, "fileno") or sys.stdout is not real_stdout: for message in messages: print(f"{message}\n", end="") return From 4d2c36f9a9e719b2048816bd9c6463a71aae5266 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:13:50 +0000 Subject: [PATCH 05/12] fix: detect capsys/redirected stdout via fileno comparison, remove PRINT_BUFFER wrapper Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 45 ++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 54ddc4345..cc590fd3b 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -377,8 +377,7 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) - with PRINT_BUFFER: - _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) + _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: @@ -403,20 +402,42 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: resumes reading, ``select()`` returns, the write completes, the main thread resumes draining the queue, and workers unblock automatically. """ - # Only use non-blocking I/O when stdout is the real file descriptor. - # In test environments (pytest capsys) or when PRINT_BUFFER is active, - # sys.stdout is replaced with a wrapper. Writing to sys.__stdout__ - # via os.write() would bypass the capture, so fall back to print(). + # We need to write to the *real* stdout fd for non-blocking I/O. + # However, in test environments (pytest capsys) or other wrappers, + # sys.stdout may have been replaced. If sys.stdout.fileno() fails + # or doesn't match sys.__stdout__.fileno(), something is capturing + # output and we must fall back to print() so it goes through the + # capture layer. + try: + current_fd = sys.stdout.fileno() + except (OSError, AttributeError, ValueError): + # capsys, PRINT_BUFFER, or other wrapper — no real fd available. + for message in messages: + print(f"{message}\n", end="") + return + real_stdout = sys.__stdout__ - if real_stdout is None or not hasattr(real_stdout, "fileno") or sys.stdout is not real_stdout: + if real_stdout is None or not hasattr(real_stdout, "fileno"): + for message in messages: + print(f"{message}\n", end="") + return + + try: + real_fd = real_stdout.fileno() + except (OSError, AttributeError, ValueError): + for message in messages: + print(f"{message}\n", end="") + return + + if current_fd != real_fd: + # stdout has been redirected; fall back to print(). for message in messages: print(f"{message}\n", end="") return try: - stdout_fd = real_stdout.fileno() - original_blocking = os.get_blocking(stdout_fd) - os.set_blocking(stdout_fd, False) + original_blocking = os.get_blocking(real_fd) + os.set_blocking(real_fd, False) except OSError: # Fallback: if we cannot set non-blocking (e.g. the fd does not # support non-blocking mode), just write normally. @@ -427,10 +448,10 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: try: for message in messages: data = f"{message}\n".encode() - _write_all_nonblocking(stdout_fd, data) + _write_all_nonblocking(real_fd, data) finally: try: - os.set_blocking(stdout_fd, original_blocking) + os.set_blocking(real_fd, original_blocking) except OSError: logger.debug("Failed to restore stdout blocking mode", exc_info=True) From 649e0e051814d805759ad322ddcaa9c0e5318c3f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 18:02:50 +0000 Subject: [PATCH 06/12] fix: use dedicated writer thread instead of non-blocking fd to avoid global BlockingIOError Setting os.set_blocking(fd, False) is a process-wide change that causes BlockingIOError in other threads (logging via print_buffer, worker threads). Instead, use a dedicated stdout-writer thread that does blocking os.write() calls. If the pipe is full, only the writer thread stalls - the main thread continues draining the record queue. Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 132 +++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index cc590fd3b..b500c7e17 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -9,10 +9,11 @@ import logging import os import os.path -import select +import queue import socket import sys import tempfile +import threading import time from collections import defaultdict from functools import wraps @@ -381,37 +382,32 @@ def launch(source: Source, args: List[str]) -> None: def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: - """Write messages to stdout using non-blocking I/O to prevent deadlocks. + """Write messages to stdout via a dedicated writer thread to prevent deadlocks. When the Airbyte platform pauses reading from the source container's - stdout pipe, a blocking ``write()`` stalls the main thread. Since the - main thread is also responsible for draining the internal record queue, - this causes a cascading deadlock: the queue fills, worker threads block - on ``queue.put()``, and the entire process hangs. - - This function sets stdout to non-blocking mode so that ``os.write()`` - raises ``BlockingIOError`` instead of blocking when the pipe buffer is - full. It then uses ``select()`` to wait (with a timeout) until the fd - is writable again. The main thread remains in a Python-level retry - loop it controls, so it never gets stuck in a kernel-level syscall. - - Memory stays bounded because the upstream record queue keeps its - default bounded size (10,000 items). When stdout is blocked the main - thread pauses here, the queue fills naturally, and worker threads - block on ``queue.put()`` with their own timeouts. When the platform - resumes reading, ``select()`` returns, the write completes, the main - thread resumes draining the queue, and workers unblock automatically. + stdout pipe, a blocking ``write()`` in the main thread stalls message + processing. Since the main thread is also responsible for draining the + internal record queue, this causes a cascading deadlock: the queue + fills, worker threads block on ``queue.put()``, and the entire process + hangs. + + This function decouples stdout writing from the main thread by routing + messages through a bounded internal queue to a dedicated writer thread. + The writer thread performs normal blocking ``os.write()`` calls; if the + pipe is full, only the writer thread stalls — the main thread continues + iterating the message generator (which drains the record queue). + + When the internal write queue fills (because the writer is blocked on + the pipe), the main thread retries ``queue.put()`` with a 1-second + timeout. A watchdog detects if no message has been accepted for 600 + seconds and raises ``RuntimeError`` to terminate the process cleanly. """ - # We need to write to the *real* stdout fd for non-blocking I/O. - # However, in test environments (pytest capsys) or other wrappers, - # sys.stdout may have been replaced. If sys.stdout.fileno() fails - # or doesn't match sys.__stdout__.fileno(), something is capturing - # output and we must fall back to print() so it goes through the - # capture layer. + # In test environments (pytest capsys) or wrappers like PRINT_BUFFER, + # sys.stdout may have been replaced. Detect this via fileno() and + # fall back to print() so output goes through the capture layer. try: current_fd = sys.stdout.fileno() except (OSError, AttributeError, ValueError): - # capsys, PRINT_BUFFER, or other wrapper — no real fd available. for message in messages: print(f"{message}\n", end="") return @@ -430,54 +426,60 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: return if current_fd != real_fd: - # stdout has been redirected; fall back to print(). for message in messages: print(f"{message}\n", end="") return - try: - original_blocking = os.get_blocking(real_fd) - os.set_blocking(real_fd, False) - except OSError: - # Fallback: if we cannot set non-blocking (e.g. the fd does not - # support non-blocking mode), just write normally. - for message in messages: - print(f"{message}\n", end="") - return + # Bounded queue decouples the main thread from stdout I/O. + # The writer thread does blocking writes; if the pipe is full only the + # writer stalls — the main thread keeps draining the record queue. + _WRITE_QUEUE_SIZE = 1000 + _WATCHDOG_TIMEOUT_S = 600 + write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) + writer_error: List[BaseException] = [] + + def _stdout_writer() -> None: + """Dedicated thread that writes queued messages to stdout.""" + try: + while True: + data = write_queue.get() + if data is None: + break + total = 0 + while total < len(data): + written = os.write(real_fd, data[total:]) + total += written + except BaseException as exc: + writer_error.append(exc) + + writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) + writer.start() try: + last_progress = time.monotonic() for message in messages: + if writer_error: + raise writer_error[0] data = f"{message}\n".encode() - _write_all_nonblocking(real_fd, data) + while True: + try: + write_queue.put(data, timeout=1.0) + last_progress = time.monotonic() + break + except queue.Full: + if writer_error: + raise writer_error[0] + elapsed = time.monotonic() - last_progress + if elapsed > _WATCHDOG_TIMEOUT_S: + raise RuntimeError( + f"stdout pipe blocked for {elapsed:.0f}s with no progress " + f"(watchdog timeout={_WATCHDOG_TIMEOUT_S}s). " + "Terminating process to prevent indefinite hang." + ) finally: - try: - os.set_blocking(real_fd, original_blocking) - except OSError: - logger.debug("Failed to restore stdout blocking mode", exc_info=True) - - -def _write_all_nonblocking(fd: int, data: bytes) -> None: - """Write all bytes to a non-blocking fd, retrying with select on EAGAIN.""" - total_written = 0 - last_progress = time.monotonic() - - while total_written < len(data): - try: - written = os.write(fd, data[total_written:]) - total_written += written - last_progress = time.monotonic() - except BlockingIOError: - # Pipe buffer is full. Wait up to 1 second for it to become - # writable, then retry. The short timeout keeps the main - # thread responsive and allows periodic stall detection. - _, writable, _ = select.select([], [fd], [], 1.0) - if not writable: - elapsed = time.monotonic() - last_progress - if elapsed > 600: - raise RuntimeError( - f"stdout pipe blocked for {elapsed:.0f}s with no progress. " - "The platform is not reading from the source container pipe." - ) + # Signal writer to drain remaining messages and exit. + write_queue.put(None) + writer.join(timeout=30) def _init_internal_request_filter() -> None: From 583e6ee633806eb335301868151eb37f97acba2f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 18:05:20 +0000 Subject: [PATCH 07/12] fix: catch Exception instead of BaseException in writer thread, re-raise KeyboardInterrupt/SystemExit Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index b500c7e17..b6313850b 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -436,7 +436,7 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: _WRITE_QUEUE_SIZE = 1000 _WATCHDOG_TIMEOUT_S = 600 write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) - writer_error: List[BaseException] = [] + writer_error: List[Exception] = [] def _stdout_writer() -> None: """Dedicated thread that writes queued messages to stdout.""" @@ -449,7 +449,9 @@ def _stdout_writer() -> None: while total < len(data): written = os.write(real_fd, data[total:]) total += written - except BaseException as exc: + except (KeyboardInterrupt, SystemExit): + raise + except Exception as exc: writer_error.append(exc) writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) From b6f3f36efe8c8e606419f8e2e1866930b397a765 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 19:08:12 +0000 Subject: [PATCH 08/12] feat: add diagnostic logging to stdout writer thread to track pipe write timing Logs when os.write() blocks for >5s (indicates platform paused reading), and logs every 30s when write_queue is full. This will help validate whether the platform ever resumes reading from the pipe after a pause. Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 46 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index b6313850b..e0c672906 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -438,8 +438,17 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) writer_error: List[Exception] = [] + logger = logging.getLogger("airbyte_cdk.stdout_writer") + _BLOCK_LOG_THRESHOLD_S = 5.0 # log when a single write blocks longer than this + messages_written = 0 + bytes_written = 0 + last_write_ts = time.monotonic() + total_blocked_s = 0.0 + block_count = 0 + def _stdout_writer() -> None: """Dedicated thread that writes queued messages to stdout.""" + nonlocal messages_written, bytes_written, last_write_ts, total_blocked_s, block_count try: while True: data = write_queue.get() @@ -447,8 +456,28 @@ def _stdout_writer() -> None: break total = 0 while total < len(data): + before = time.monotonic() written = os.write(real_fd, data[total:]) + elapsed = time.monotonic() - before total += written + if elapsed >= _BLOCK_LOG_THRESHOLD_S: + block_count += 1 + total_blocked_s += elapsed + logger.warning( + "STDOUT_WRITER: os.write() blocked for %.1fs " + "(wrote %d bytes). block_count=%d total_blocked=%.1fs " + "messages_written=%d bytes_written=%d queue_size=%d", + elapsed, + written, + block_count, + total_blocked_s, + messages_written, + bytes_written, + write_queue.qsize(), + ) + messages_written += 1 + bytes_written += len(data) + last_write_ts = time.monotonic() except (KeyboardInterrupt, SystemExit): raise except Exception as exc: @@ -472,10 +501,27 @@ def _stdout_writer() -> None: if writer_error: raise writer_error[0] elapsed = time.monotonic() - last_progress + if int(elapsed) % 30 == 0 and int(elapsed) > 0: + logger.warning( + "STDOUT_WRITER: write_queue full for %.0fs. " + "writer_messages=%d writer_bytes=%d " + "writer_blocks=%d writer_total_blocked=%.1fs " + "writer_last_write=%.1fs_ago queue_size=%d", + elapsed, + messages_written, + bytes_written, + block_count, + total_blocked_s, + time.monotonic() - last_write_ts, + write_queue.qsize(), + ) if elapsed > _WATCHDOG_TIMEOUT_S: raise RuntimeError( f"stdout pipe blocked for {elapsed:.0f}s with no progress " f"(watchdog timeout={_WATCHDOG_TIMEOUT_S}s). " + f"Writer stats: messages={messages_written} bytes={bytes_written} " + f"blocks={block_count} total_blocked={total_blocked_s:.1f}s " + f"last_write={time.monotonic() - last_write_ts:.1f}s ago. " "Terminating process to prevent indefinite hang." ) finally: From 75bd8909d22f2ddf1adc5871e77437d482906026 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 13:52:44 +0000 Subject: [PATCH 09/12] feat: add stderr heartbeat thread to prove platform never resumes reading from stdout pipe Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 49 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index e0c672906..f2bb99565 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -440,15 +440,51 @@ def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: logger = logging.getLogger("airbyte_cdk.stdout_writer") _BLOCK_LOG_THRESHOLD_S = 5.0 # log when a single write blocks longer than this + _HEARTBEAT_INTERVAL_S = 30.0 # emit a heartbeat to stderr every 30s messages_written = 0 bytes_written = 0 last_write_ts = time.monotonic() total_blocked_s = 0.0 block_count = 0 + pipe_blocked = False # True while os.write() is in progress + pipe_blocked_since = 0.0 # monotonic timestamp when current os.write() started + heartbeat_stop = threading.Event() + + def _heartbeat() -> None: + """Emit periodic status to stderr so we can prove pipe-blocking in Cloud logs. + + This thread writes directly to fd 2 (stderr) which is collected by the + Kubernetes container runtime independently of the orchestrator that reads + stdout. Even when the orchestrator stops reading stdout, these heartbeat + lines should still appear in the Cloud job logs. + """ + start = time.monotonic() + stderr_fd = 2 # write directly to fd 2, bypassing Python buffering + while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): + now = time.monotonic() + elapsed_total = now - start + blocked_str = "YES" if pipe_blocked else "NO" + blocked_dur = f" blocked_since={now - pipe_blocked_since:.0f}s" if pipe_blocked else "" + line = ( + f"STDOUT_WRITER_HEARTBEAT: t={elapsed_total:.0f}s " + f"msgs={messages_written} bytes={bytes_written} " + f"pipe_blocked={blocked_str}{blocked_dur} " + f"queue={write_queue.qsize()}/{_WRITE_QUEUE_SIZE}\n" + ) + try: + os.write(stderr_fd, line.encode()) + except OSError: + pass # stderr itself might be broken; nothing we can do + + heartbeat_thread = threading.Thread( + target=_heartbeat, name="stdout-writer-heartbeat", daemon=True + ) + heartbeat_thread.start() def _stdout_writer() -> None: """Dedicated thread that writes queued messages to stdout.""" nonlocal messages_written, bytes_written, last_write_ts, total_blocked_s, block_count + nonlocal pipe_blocked, pipe_blocked_since try: while True: data = write_queue.get() @@ -456,9 +492,12 @@ def _stdout_writer() -> None: break total = 0 while total < len(data): - before = time.monotonic() + pipe_blocked = True + pipe_blocked_since = time.monotonic() + before = pipe_blocked_since written = os.write(real_fd, data[total:]) elapsed = time.monotonic() - before + pipe_blocked = False total += written if elapsed >= _BLOCK_LOG_THRESHOLD_S: block_count += 1 @@ -485,6 +524,13 @@ def _stdout_writer() -> None: writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) writer.start() + logger.info( + "STDOUT_WRITER: started writer_thread fd=%d queue_size=%d watchdog=%ds heartbeat=%ds", + real_fd, + _WRITE_QUEUE_SIZE, + _WATCHDOG_TIMEOUT_S, + int(_HEARTBEAT_INTERVAL_S), + ) try: last_progress = time.monotonic() @@ -525,6 +571,7 @@ def _stdout_writer() -> None: "Terminating process to prevent indefinite hang." ) finally: + heartbeat_stop.set() # Signal writer to drain remaining messages and exit. write_queue.put(None) writer.join(timeout=30) From 2fc96fa4bf09e9da10214e539a48c8b9c400db96 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:30:59 +0000 Subject: [PATCH 10/12] refactor: simplify to heartbeat-only diagnostic (remove writer thread/queue/watchdog) Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 203 ++++++-------------------------------- 1 file changed, 31 insertions(+), 172 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index f2bb99565..98018e0f4 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -9,7 +9,6 @@ import logging import os import os.path -import queue import socket import sys import tempfile @@ -378,203 +377,63 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) - _nonblocking_write_to_stdout(source_entrypoint.run(parsed_args)) - -def _nonblocking_write_to_stdout(messages: Iterable[str]) -> None: - """Write messages to stdout via a dedicated writer thread to prevent deadlocks. - - When the Airbyte platform pauses reading from the source container's - stdout pipe, a blocking ``write()`` in the main thread stalls message - processing. Since the main thread is also responsible for draining the - internal record queue, this causes a cascading deadlock: the queue - fills, worker threads block on ``queue.put()``, and the entire process - hangs. - - This function decouples stdout writing from the main thread by routing - messages through a bounded internal queue to a dedicated writer thread. - The writer thread performs normal blocking ``os.write()`` calls; if the - pipe is full, only the writer thread stalls — the main thread continues - iterating the message generator (which drains the record queue). - - When the internal write queue fills (because the writer is blocked on - the pipe), the main thread retries ``queue.put()`` with a 1-second - timeout. A watchdog detects if no message has been accepted for 600 - seconds and raises ``RuntimeError`` to terminate the process cleanly. - """ - # In test environments (pytest capsys) or wrappers like PRINT_BUFFER, - # sys.stdout may have been replaced. Detect this via fileno() and - # fall back to print() so output goes through the capture layer. - try: - current_fd = sys.stdout.fileno() - except (OSError, AttributeError, ValueError): - for message in messages: - print(f"{message}\n", end="") - return - - real_stdout = sys.__stdout__ - if real_stdout is None or not hasattr(real_stdout, "fileno"): - for message in messages: - print(f"{message}\n", end="") - return - - try: - real_fd = real_stdout.fileno() - except (OSError, AttributeError, ValueError): - for message in messages: - print(f"{message}\n", end="") - return - - if current_fd != real_fd: - for message in messages: - print(f"{message}\n", end="") - return - - # Bounded queue decouples the main thread from stdout I/O. - # The writer thread does blocking writes; if the pipe is full only the - # writer stalls — the main thread keeps draining the record queue. - _WRITE_QUEUE_SIZE = 1000 - _WATCHDOG_TIMEOUT_S = 600 - write_queue: queue.Queue[Optional[bytes]] = queue.Queue(maxsize=_WRITE_QUEUE_SIZE) - writer_error: List[Exception] = [] - - logger = logging.getLogger("airbyte_cdk.stdout_writer") - _BLOCK_LOG_THRESHOLD_S = 5.0 # log when a single write blocks longer than this - _HEARTBEAT_INTERVAL_S = 30.0 # emit a heartbeat to stderr every 30s + # Heartbeat state — shared with the background heartbeat thread. + _HEARTBEAT_INTERVAL_S = 30.0 messages_written = 0 bytes_written = 0 - last_write_ts = time.monotonic() - total_blocked_s = 0.0 - block_count = 0 - pipe_blocked = False # True while os.write() is in progress - pipe_blocked_since = 0.0 # monotonic timestamp when current os.write() started + print_blocked = False + print_blocked_since = 0.0 heartbeat_stop = threading.Event() def _heartbeat() -> None: - """Emit periodic status to stderr so we can prove pipe-blocking in Cloud logs. + """Emit periodic status to stderr to diagnose stdout pipe blocking. - This thread writes directly to fd 2 (stderr) which is collected by the - Kubernetes container runtime independently of the orchestrator that reads - stdout. Even when the orchestrator stops reading stdout, these heartbeat - lines should still appear in the Cloud job logs. + Writes directly to fd 2 (stderr) which the Kubernetes container + runtime collects independently of the orchestrator reading stdout. """ start = time.monotonic() - stderr_fd = 2 # write directly to fd 2, bypassing Python buffering + stderr_fd = 2 while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): now = time.monotonic() - elapsed_total = now - start - blocked_str = "YES" if pipe_blocked else "NO" - blocked_dur = f" blocked_since={now - pipe_blocked_since:.0f}s" if pipe_blocked else "" + elapsed = now - start + blocked_str = "YES" if print_blocked else "NO" + blocked_dur = ( + f" blocked_since={now - print_blocked_since:.0f}s" + if print_blocked + else "" + ) line = ( - f"STDOUT_WRITER_HEARTBEAT: t={elapsed_total:.0f}s " + f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " f"msgs={messages_written} bytes={bytes_written} " - f"pipe_blocked={blocked_str}{blocked_dur} " - f"queue={write_queue.qsize()}/{_WRITE_QUEUE_SIZE}\n" + f"print_blocked={blocked_str}{blocked_dur}\n" ) try: os.write(stderr_fd, line.encode()) except OSError: - pass # stderr itself might be broken; nothing we can do + pass heartbeat_thread = threading.Thread( - target=_heartbeat, name="stdout-writer-heartbeat", daemon=True + target=_heartbeat, name="stdout-heartbeat", daemon=True ) heartbeat_thread.start() - def _stdout_writer() -> None: - """Dedicated thread that writes queued messages to stdout.""" - nonlocal messages_written, bytes_written, last_write_ts, total_blocked_s, block_count - nonlocal pipe_blocked, pipe_blocked_since - try: - while True: - data = write_queue.get() - if data is None: - break - total = 0 - while total < len(data): - pipe_blocked = True - pipe_blocked_since = time.monotonic() - before = pipe_blocked_since - written = os.write(real_fd, data[total:]) - elapsed = time.monotonic() - before - pipe_blocked = False - total += written - if elapsed >= _BLOCK_LOG_THRESHOLD_S: - block_count += 1 - total_blocked_s += elapsed - logger.warning( - "STDOUT_WRITER: os.write() blocked for %.1fs " - "(wrote %d bytes). block_count=%d total_blocked=%.1fs " - "messages_written=%d bytes_written=%d queue_size=%d", - elapsed, - written, - block_count, - total_blocked_s, - messages_written, - bytes_written, - write_queue.qsize(), - ) + # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs + # Refer to: https://github.com/airbytehq/oncall/issues/6235 + try: + with PRINT_BUFFER: + for message in source_entrypoint.run(parsed_args): + # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and + # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time + data = f"{message}\n" + print_blocked = True + print_blocked_since = time.monotonic() + print(data, end="") + print_blocked = False messages_written += 1 bytes_written += len(data) - last_write_ts = time.monotonic() - except (KeyboardInterrupt, SystemExit): - raise - except Exception as exc: - writer_error.append(exc) - - writer = threading.Thread(target=_stdout_writer, name="stdout-writer", daemon=True) - writer.start() - logger.info( - "STDOUT_WRITER: started writer_thread fd=%d queue_size=%d watchdog=%ds heartbeat=%ds", - real_fd, - _WRITE_QUEUE_SIZE, - _WATCHDOG_TIMEOUT_S, - int(_HEARTBEAT_INTERVAL_S), - ) - - try: - last_progress = time.monotonic() - for message in messages: - if writer_error: - raise writer_error[0] - data = f"{message}\n".encode() - while True: - try: - write_queue.put(data, timeout=1.0) - last_progress = time.monotonic() - break - except queue.Full: - if writer_error: - raise writer_error[0] - elapsed = time.monotonic() - last_progress - if int(elapsed) % 30 == 0 and int(elapsed) > 0: - logger.warning( - "STDOUT_WRITER: write_queue full for %.0fs. " - "writer_messages=%d writer_bytes=%d " - "writer_blocks=%d writer_total_blocked=%.1fs " - "writer_last_write=%.1fs_ago queue_size=%d", - elapsed, - messages_written, - bytes_written, - block_count, - total_blocked_s, - time.monotonic() - last_write_ts, - write_queue.qsize(), - ) - if elapsed > _WATCHDOG_TIMEOUT_S: - raise RuntimeError( - f"stdout pipe blocked for {elapsed:.0f}s with no progress " - f"(watchdog timeout={_WATCHDOG_TIMEOUT_S}s). " - f"Writer stats: messages={messages_written} bytes={bytes_written} " - f"blocks={block_count} total_blocked={total_blocked_s:.1f}s " - f"last_write={time.monotonic() - last_write_ts:.1f}s ago. " - "Terminating process to prevent indefinite hang." - ) finally: heartbeat_stop.set() - # Signal writer to drain remaining messages and exit. - write_queue.put(None) - writer.join(timeout=30) def _init_internal_request_filter() -> None: From 17b4a6f7596e2542a51887710a276b723471161b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:32:20 +0000 Subject: [PATCH 11/12] style: fix ruff formatting Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 98018e0f4..0205d95cf 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -399,9 +399,7 @@ def _heartbeat() -> None: elapsed = now - start blocked_str = "YES" if print_blocked else "NO" blocked_dur = ( - f" blocked_since={now - print_blocked_since:.0f}s" - if print_blocked - else "" + f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else "" ) line = ( f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " @@ -413,9 +411,7 @@ def _heartbeat() -> None: except OSError: pass - heartbeat_thread = threading.Thread( - target=_heartbeat, name="stdout-heartbeat", daemon=True - ) + heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True) heartbeat_thread.start() # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs From 3a41aaa2b50f8e654ebc4372cfdbb80973952cf8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 21:35:23 +0000 Subject: [PATCH 12/12] style: add explanatory comment to empty except clause Co-Authored-By: unknown <> --- airbyte_cdk/entrypoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 0205d95cf..c771a96bd 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -409,7 +409,7 @@ def _heartbeat() -> None: try: os.write(stderr_fd, line.encode()) except OSError: - pass + pass # Best-effort diagnostic — if stderr (fd 2) is broken, silently give up. heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True) heartbeat_thread.start()