From 0c6503d088116b61f0bf839554288bbda628a287 Mon Sep 17 00:00:00 2001 From: tcely Date: Tue, 26 May 2026 09:15:31 -0400 Subject: [PATCH] feat(logging): give up on missing connections Implement an isolated, unit-based reconnection tracking engine inside the background logging handler thread. This structure natively handles both instant startup dropouts (never listening) and long-term service disappearances without tracking raw wall-clock timers. Key architectural controls: - Encapsulates dynamic telemetry properties (budget, first-failure snapshot, timeout values, and bonus/cost configurations) directly inside a private _ReconnectionState dataclass instantiated inside the thread. - Implements a 2:1 recovery ratio (failures cost 5 units; successful handshakes reward 10 units plus 1 unit per item sent) capped strictly at 1,000 units. - Implements a narrow-band jitter range combined with a 1.0+ additive guard. This prevents rapid-fire sub-second retry loops on early drops while injecting fractional millisecond variance to break up concurrent thundering herds. --- tubesync/common/logs/syslog/hat/_default.py | 146 +++++++++++++++++--- 1 file changed, 130 insertions(+), 16 deletions(-) diff --git a/tubesync/common/logs/syslog/hat/_default.py b/tubesync/common/logs/syslog/hat/_default.py index a5f335d21..dcd13172a 100644 --- a/tubesync/common/logs/syslog/hat/_default.py +++ b/tubesync/common/logs/syslog/hat/_default.py @@ -1,14 +1,17 @@ import collections import contextlib -from dataclasses import dataclass, field import logging +import math import os import queue +import random import socket import ssl import threading import time +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone as dt_timezone from typing import Optional, Tuple from ._logger import logger @@ -57,6 +60,16 @@ def __init__(self, host, port, comm_type, queue_size, reconnect_delay, *args, ** } + @dataclass + class _ReconnectionState: + timeout: float = 5.0 + budget: int = 60 + cap: int = 1000 + snapshot: int = -1 + success: int = 10 + failure: int = 5 + + @dataclass(frozen=True) class RetryItem: """Encapsulates a structured syslog entry in the retry transport pipeline.""" @@ -89,7 +102,19 @@ def _create_udp_socket(state): return s - def _item_completed(retry_queue, core_queue, item): + def _get_exponential_delay(remaining, total, base_delay): + """ + Dynamically tracks exponential backoff relative to any base delay input. + """ + ratio = max(0.0, min(1.0, 1.0 - (remaining / total))) + + # Stays proportional: start_mult = 0.1, end_mult = 5.0 + multiplier = 0.1 * (50.0 ** ratio) + + return base_delay * multiplier + + + def _item_completed(retry_queue, core_queue, item, reconnect=None): """ Drains the processed item from the retry queue and issues a task acknowledgment to the synchronized queue if the item was not synthetically created. @@ -98,9 +123,14 @@ def _item_completed(retry_queue, core_queue, item): retry_queue.popleft() if not item.synthetic: core_queue.task_done() + # Every sent item increases the budget by a single unit. + # This makes it harder to shutdown because of budget exhaustion + # when there was a working endpoint before the failure. + if reconnect is not None: + reconnect.budget = min(reconnect.cap, 1 + reconnect.budget) - def _logging_handler_thread(state, logger=logger): + def _logging_handler_thread(state, shutdown=None, logger=logger): """ Worker thread that drains messages and guarantees strict transport-level delivery by checking stateless item origins packed inside RetryItem containers. @@ -123,9 +153,24 @@ def _logging_handler_thread(state, logger=logger): # Chronological staging queue dedicated to maintaining strict FIFO order during outages retry_queue = collections.deque() + retry_queue_size = 0 + + # Reconnection budgetting + reconnect = _ReconnectionState( + timeout=state.reconnect_delay, + ) - # Loop persists on shutdown until retry_queue is fully empty. + # Loop persists on shutdown until the retry queue is fully empty. while retry_queue or not state.closed.is_set(): + # the reconnect budget was exhausted, drain the retry queue + if 0 > reconnect.budget and state.closed.is_set(): + retry_queue_size = len(retry_queue) + while retry_queue: + item = retry_queue.popleft() + if not item.synthetic: + state.queue.task_done() + continue + # connect to the endpoint s = None try: @@ -137,8 +182,44 @@ def _logging_handler_thread(state, logger=logger): if s is not None: with contextlib.suppress(Exception): s.close() - time.sleep(state.reconnect_delay) + if 0 > reconnect.snapshot: + reconnect.snapshot = max(10, reconnect.budget) + reconnect.budget -= reconnect.failure + if 0 > reconnect.budget: + state.closed.set() + else: + reconnect.timeout = _get_exponential_delay( + reconnect.budget, + reconnect.snapshot, + state.reconnect_delay, + ) + # jitter + if 1 >= reconnect.timeout: + reconnect.timeout = max(1.0, 1.0 + reconnect.timeout) + reconnect.timeout = random.uniform( + math.floor(reconnect.timeout), + reconnect.timeout, + ) + now_dt = datetime.now(tz=dt_timezone.utc) + next_retry_time = now_dt + timedelta(seconds=reconnect.timeout) + iso_timestamp = next_retry_time.isoformat(timespec='seconds') + if reconnect.timeout > state.reconnect_delay: + logger.warning( + 'Persistent failures to connect to: ' + f'{state.host}:{state.port}/{state.comm_type.name}\n' + '\tIs the endpoint service running??\n' + '\tRetries remaining: ' + f'{1 + (reconnect.budget // reconnect.failure)}\n' + f'\tNext attempt at: {iso_timestamp}', + ) + time.sleep(reconnect.timeout) continue + else: + reconnect.snapshot = -1 + reconnect.budget = min( + reconnect.cap, + reconnect.success + reconnect.budget, + ) # Connection successfully established; enter message transmission loop # while True optimizes throughput and relies on socket exceptions to break the loop context @@ -216,7 +297,7 @@ def _logging_handler_thread(state, logger=logger): # After reconnecting we will try it again. break else: - _item_completed(retry_queue, state.queue, item) + _item_completed(retry_queue, state.queue, item, reconnect) finally: # Clear references to optimize memory tracking item = None @@ -226,6 +307,27 @@ def _logging_handler_thread(state, logger=logger): with contextlib.suppress(Exception): s.close() + # --- DRAIN QUEUE AND SHUTDOWN --- + # The budget hit 0 (nothing is listening). We must drain everything cleanly + # to prevent state.queue.join() block-freezing the rest of the application. + # The retry queue was already cleanly drained inside the while loop. + if 0 > reconnect.budget: + state_queue_size = 0 + while True: + try: + state.queue.get_nowait() + state.queue.task_done() + state_queue_size += 1 + except queue.Empty: + break + logger.warning( + f'Thread shutdown complete. Purged {retry_queue_size} local retry items ' + f'and {state_queue_size} queued items to prevent application blocks.' + ) + # Use the callback function when it was provided. + if shutdown is not None and callable(shutdown): + shutdown() + class SyslogHandler(hat_syslog_handler_SyslogHandler): """ @@ -241,7 +343,7 @@ class SyslogHandler(hat_syslog_handler_SyslogHandler): corruption resulting from os.fork(), and avoids blocking task-execution loops during a graceful worker process shutdown. """ - def __init__(self, host, port, comm_type, queue_size=1024, reconnect_delay=5, *args, **kwargs): + def __init__(self, host, port, comm_type, queue_size=1024, reconnect_delay=5, logger=logger, *args, **kwargs): """Initializes the wrapper and neutralizes conflicting parent process states.""" super().__init__(host, port, common.CommType.UDP, queue_size, reconnect_delay, *args, **kwargs) @@ -270,6 +372,7 @@ def __init__(self, host, port, comm_type, queue_size=1024, reconnect_delay=5, *a self.__thread = None self._initial_pid = os.getpid() self._closing = threading.Event() + self._logger = logger # Save the original arguments self.host = host @@ -332,10 +435,21 @@ def _create_thread(self): initial = self.__thread is None with self.__state.cv: previous = self.__thread + cb_closing = self._closing + cb_closed = self.__state.closed + cb_handler_close = super(hat_syslog_handler_SyslogHandler, self).close + def shutdown_cb(): + cb_closing.set() + cb_closed.set() + cb_handler_close() self.__thread = threading.Thread( target=_logging_handler_thread, - args=(self.__state,), + kwargs=dict( + state=self.__state, + logger=self._logger, + shutdown=shutdown_cb, + ), daemon=True, ) @@ -349,7 +463,7 @@ def _create_thread(self): scoreboard.alive = previous._scoreboard.alive scoreboard.initialized = previous._scoreboard.initialized scoreboard.previous_start = previous._scoreboard.start - logger.debug(f'Created a replacement thread: {scoreboard=}') + self._logger.debug(f'Created a replacement thread: {scoreboard=}') self.__thread.start() @@ -391,22 +505,22 @@ def emit(self, record): """Enqueues new log records and guarantees active connection coverage.""" if self._closing.is_set(): with contextlib.suppress(Exception): - logger.handle(record) + self._logger.handle(record) return self._create_thread() if not self._alive_thread(): with contextlib.suppress(Exception): - logger.handle(record) + self._logger.handle(record) return state = self.__state if state.closed.is_set(): self._closing.set() - logger.warning('Closed in emit') + self._logger.warning('Closed in emit') with contextlib.suppress(Exception): - logger.handle(record) + self._logger.handle(record) return msg = _record_to_msg(record) @@ -422,9 +536,9 @@ def emit(self, record): else: state.dropped[-1] = 1 + dropped_count - logger.warning(f'Dropped a log message in emit due to buffer overflow: {msg.msg!r}') + self._logger.warning(f'Dropped a log message in emit due to buffer overflow: {msg.msg!r}') with contextlib.suppress(Exception): - logger.handle(record) + self._logger.handle(record) def flush(self): """Blocks execution until the internal logging queue is empty.""" @@ -457,7 +571,7 @@ def close(self): # The native queue.join() blockade is now perfectly synchronized with the internal # tracking flow loop. It will block until retry_queue is 100% empty. if self._alive_thread(): - logger.debug('Flushing logging queue in close') + self._logger.debug('Flushing logging queue in close') with contextlib.suppress(Exception): state.queue.join()