Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 130 additions & 16 deletions tubesync/common/logs/syslog/hat/_default.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import collections
import contextlib
from dataclasses import dataclass, field
import logging
import math
import os
import queue
import random
import socket
import ssl
import threading
import time

from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone as dt_timezone
from typing import Optional, Tuple

from ._logger import logger
Expand Down Expand Up @@ -57,6 +60,16 @@ def __init__(self, host, port, comm_type, queue_size, reconnect_delay, *args, **
}


@dataclass
class _ReconnectionState:
timeout: float = 5.0
budget: int = 60
cap: int = 1000
snapshot: int = -1
success: int = 10
failure: int = 5


@dataclass(frozen=True)
class RetryItem:
"""Encapsulates a structured syslog entry in the retry transport pipeline."""
Expand Down Expand Up @@ -89,7 +102,19 @@ def _create_udp_socket(state):
return s


def _item_completed(retry_queue, core_queue, item):
def _get_exponential_delay(remaining, total, base_delay):
"""
Dynamically tracks exponential backoff relative to any base delay input.
"""
ratio = max(0.0, min(1.0, 1.0 - (remaining / total)))

# Stays proportional: start_mult = 0.1, end_mult = 5.0
multiplier = 0.1 * (50.0 ** ratio)

return base_delay * multiplier


def _item_completed(retry_queue, core_queue, item, reconnect=None):
"""
Drains the processed item from the retry queue and issues a task acknowledgment
to the synchronized queue if the item was not synthetically created.
Expand All @@ -98,9 +123,14 @@ def _item_completed(retry_queue, core_queue, item):
retry_queue.popleft()
if not item.synthetic:
core_queue.task_done()
# Every sent item increases the budget by a single unit.
# This makes it harder to shutdown because of budget exhaustion
# when there was a working endpoint before the failure.
if reconnect is not None:
reconnect.budget = min(reconnect.cap, 1 + reconnect.budget)


def _logging_handler_thread(state, logger=logger):
def _logging_handler_thread(state, shutdown=None, logger=logger):
"""
Worker thread that drains messages and guarantees strict transport-level delivery
by checking stateless item origins packed inside RetryItem containers.
Expand All @@ -123,9 +153,24 @@ def _logging_handler_thread(state, logger=logger):

# Chronological staging queue dedicated to maintaining strict FIFO order during outages
retry_queue = collections.deque()
retry_queue_size = 0

# Reconnection budgetting
reconnect = _ReconnectionState(
timeout=state.reconnect_delay,
)

# Loop persists on shutdown until retry_queue is fully empty.
# Loop persists on shutdown until the retry queue is fully empty.
while retry_queue or not state.closed.is_set():
# the reconnect budget was exhausted, drain the retry queue
if 0 > reconnect.budget and state.closed.is_set():
retry_queue_size = len(retry_queue)
while retry_queue:
item = retry_queue.popleft()
if not item.synthetic:
state.queue.task_done()
continue

# connect to the endpoint
s = None
try:
Expand All @@ -137,8 +182,44 @@ def _logging_handler_thread(state, logger=logger):
if s is not None:
with contextlib.suppress(Exception):
s.close()
time.sleep(state.reconnect_delay)
if 0 > reconnect.snapshot:
reconnect.snapshot = max(10, reconnect.budget)
reconnect.budget -= reconnect.failure
if 0 > reconnect.budget:
state.closed.set()
else:
reconnect.timeout = _get_exponential_delay(
reconnect.budget,
reconnect.snapshot,
state.reconnect_delay,
)
# jitter
if 1 >= reconnect.timeout:
reconnect.timeout = max(1.0, 1.0 + reconnect.timeout)
reconnect.timeout = random.uniform(
math.floor(reconnect.timeout),
reconnect.timeout,
)
now_dt = datetime.now(tz=dt_timezone.utc)
next_retry_time = now_dt + timedelta(seconds=reconnect.timeout)
iso_timestamp = next_retry_time.isoformat(timespec='seconds')
if reconnect.timeout > state.reconnect_delay:
logger.warning(
'Persistent failures to connect to: '
f'{state.host}:{state.port}/{state.comm_type.name}\n'
'\tIs the endpoint service running??\n'
'\tRetries remaining: '
f'{1 + (reconnect.budget // reconnect.failure)}\n'
f'\tNext attempt at: {iso_timestamp}',
)
time.sleep(reconnect.timeout)
continue
else:
reconnect.snapshot = -1
reconnect.budget = min(
reconnect.cap,
reconnect.success + reconnect.budget,
)

# Connection successfully established; enter message transmission loop
# while True optimizes throughput and relies on socket exceptions to break the loop context
Expand Down Expand Up @@ -216,7 +297,7 @@ def _logging_handler_thread(state, logger=logger):
# After reconnecting we will try it again.
break
else:
_item_completed(retry_queue, state.queue, item)
_item_completed(retry_queue, state.queue, item, reconnect)
finally:
# Clear references to optimize memory tracking
item = None
Expand All @@ -226,6 +307,27 @@ def _logging_handler_thread(state, logger=logger):
with contextlib.suppress(Exception):
s.close()

# --- DRAIN QUEUE AND SHUTDOWN ---
# The budget hit 0 (nothing is listening). We must drain everything cleanly
# to prevent state.queue.join() block-freezing the rest of the application.
# The retry queue was already cleanly drained inside the while loop.
if 0 > reconnect.budget:
state_queue_size = 0
while True:
try:
state.queue.get_nowait()
state.queue.task_done()
state_queue_size += 1
except queue.Empty:
break
logger.warning(
f'Thread shutdown complete. Purged {retry_queue_size} local retry items '
f'and {state_queue_size} queued items to prevent application blocks.'
)
# Use the callback function when it was provided.
if shutdown is not None and callable(shutdown):
shutdown()


class SyslogHandler(hat_syslog_handler_SyslogHandler):
"""
Expand All @@ -241,7 +343,7 @@ class SyslogHandler(hat_syslog_handler_SyslogHandler):
corruption resulting from os.fork(), and avoids blocking task-execution loops
during a graceful worker process shutdown.
"""
def __init__(self, host, port, comm_type, queue_size=1024, reconnect_delay=5, *args, **kwargs):
def __init__(self, host, port, comm_type, queue_size=1024, reconnect_delay=5, logger=logger, *args, **kwargs):
"""Initializes the wrapper and neutralizes conflicting parent process states."""
super().__init__(host, port, common.CommType.UDP, queue_size, reconnect_delay, *args, **kwargs)

Expand Down Expand Up @@ -270,6 +372,7 @@ def __init__(self, host, port, comm_type, queue_size=1024, reconnect_delay=5, *a
self.__thread = None
self._initial_pid = os.getpid()
self._closing = threading.Event()
self._logger = logger

# Save the original arguments
self.host = host
Expand Down Expand Up @@ -332,10 +435,21 @@ def _create_thread(self):
initial = self.__thread is None
with self.__state.cv:
previous = self.__thread
cb_closing = self._closing
cb_closed = self.__state.closed
cb_handler_close = super(hat_syslog_handler_SyslogHandler, self).close
def shutdown_cb():
cb_closing.set()
cb_closed.set()
cb_handler_close()

self.__thread = threading.Thread(
target=_logging_handler_thread,
args=(self.__state,),
kwargs=dict(
state=self.__state,
logger=self._logger,
shutdown=shutdown_cb,
),
daemon=True,
)

Expand All @@ -349,7 +463,7 @@ def _create_thread(self):
scoreboard.alive = previous._scoreboard.alive
scoreboard.initialized = previous._scoreboard.initialized
scoreboard.previous_start = previous._scoreboard.start
logger.debug(f'Created a replacement thread: {scoreboard=}')
self._logger.debug(f'Created a replacement thread: {scoreboard=}')

self.__thread.start()

Expand Down Expand Up @@ -391,22 +505,22 @@ def emit(self, record):
"""Enqueues new log records and guarantees active connection coverage."""
if self._closing.is_set():
with contextlib.suppress(Exception):
logger.handle(record)
self._logger.handle(record)
return

self._create_thread()

if not self._alive_thread():
with contextlib.suppress(Exception):
logger.handle(record)
self._logger.handle(record)
return

state = self.__state
if state.closed.is_set():
self._closing.set()
logger.warning('Closed in emit')
self._logger.warning('Closed in emit')
with contextlib.suppress(Exception):
logger.handle(record)
self._logger.handle(record)
return

msg = _record_to_msg(record)
Expand All @@ -422,9 +536,9 @@ def emit(self, record):
else:
state.dropped[-1] = 1 + dropped_count

logger.warning(f'Dropped a log message in emit due to buffer overflow: {msg.msg!r}')
self._logger.warning(f'Dropped a log message in emit due to buffer overflow: {msg.msg!r}')
with contextlib.suppress(Exception):
logger.handle(record)
self._logger.handle(record)

def flush(self):
"""Blocks execution until the internal logging queue is empty."""
Expand Down Expand Up @@ -457,7 +571,7 @@ def close(self):
# The native queue.join() blockade is now perfectly synchronized with the internal
# tracking flow loop. It will block until retry_queue is 100% empty.
if self._alive_thread():
logger.debug('Flushing logging queue in close')
self._logger.debug('Flushing logging queue in close')
with contextlib.suppress(Exception):
state.queue.join()

Expand Down