diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..c771a96bd 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -7,10 +7,13 @@ import ipaddress import json import logging +import os import os.path import socket import sys import tempfile +import threading +import time from collections import defaultdict from functools import wraps from typing import Any, DefaultDict, Iterable, List, Mapping, Optional @@ -374,13 +377,59 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) + + # Heartbeat state — shared with the background heartbeat thread. + _HEARTBEAT_INTERVAL_S = 30.0 + messages_written = 0 + bytes_written = 0 + print_blocked = False + print_blocked_since = 0.0 + heartbeat_stop = threading.Event() + + def _heartbeat() -> None: + """Emit periodic status to stderr to diagnose stdout pipe blocking. + + Writes directly to fd 2 (stderr) which the Kubernetes container + runtime collects independently of the orchestrator reading stdout. + """ + start = time.monotonic() + stderr_fd = 2 + while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S): + now = time.monotonic() + elapsed = now - start + blocked_str = "YES" if print_blocked else "NO" + blocked_dur = ( + f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else "" + ) + line = ( + f"STDOUT_HEARTBEAT: t={elapsed:.0f}s " + f"msgs={messages_written} bytes={bytes_written} " + f"print_blocked={blocked_str}{blocked_dur}\n" + ) + try: + os.write(stderr_fd, line.encode()) + except OSError: + pass # Best-effort diagnostic — if stderr (fd 2) is broken, silently give up. + + heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True) + heartbeat_thread.start() + # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs # Refer to: https://github.com/airbytehq/oncall/issues/6235 - with PRINT_BUFFER: - for message in source_entrypoint.run(parsed_args): - # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and - # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time - print(f"{message}\n", end="") + try: + with PRINT_BUFFER: + for message in source_entrypoint.run(parsed_args): + # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and + # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time + data = f"{message}\n" + print_blocked = True + print_blocked_since = time.monotonic() + print(data, end="") + print_blocked = False + messages_written += 1 + bytes_written += len(data) + finally: + heartbeat_stop.set() def _init_internal_request_filter() -> None: