diff --git a/tubesync/common/huey_syslog.py b/tubesync/common/huey_syslog.py deleted file mode 100644 index ecacd2ed0..000000000 --- a/tubesync/common/huey_syslog.py +++ /dev/null @@ -1,540 +0,0 @@ -import collections -import contextlib -from dataclasses import dataclass, field -import logging -import os -import queue -import socket -import ssl -import threading -import time - -from typing import Optional, Tuple - -from hat.syslog import common, encoder -from hat.syslog.handler import ( - SyslogHandler as hat_syslog_handler_SyslogHandler, - _ThreadState, - _create_dropped_msg, - _record_to_msg, -) - - -logger = logging.getLogger(__name__) - -_SOCKET_FACTORIES = { - common.CommType.TCP: lambda state, ctx: _create_tcp_socket(state), - common.CommType.TLS: lambda state, ctx: _create_tcp_socket(state, ctx), - common.CommType.UDP: lambda state, ctx: _create_udp_socket(state), -} - - -@dataclass(frozen=True) -class RetryItem: - """Encapsulates a structured syslog entry in the retry transport pipeline.""" - synthetic: bool - msg: common.Msg - - -@dataclass -class ThreadScoreboard: - """Tracks precision execution lifecycles and diagnostic markers for background workers.""" - start: Tuple[float, int] = field(default_factory=lambda: (time.time(), time.monotonic_ns())) - alive: Optional[Tuple[float, int]] = None - initialized: Optional[Tuple[float, int]] = None - previous_start: Optional[Tuple[float, int]] = None - - -def _create_tcp_socket(state, ctx=None): - """Establishes an optimized TCP or wrapped TLS stream transport connection.""" - s = socket.create_connection((state.host, state.port), timeout=5.0) - s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - if ctx: - s = ctx.wrap_socket(s) - return s - - -def _create_udp_socket(state): - """Establishes an un-bonded UDP datagram socket connection endpoint.""" - s = socket.socket(type=socket.SOCK_DGRAM) - s.connect((state.host, state.port)) - return s - - -def _item_completed(retry_queue, core_queue, item): - """ - Drains the processed item from the retry queue and issues a task acknowledgment - to the synchronized queue if the item was not synthetically created. - """ - # SUCCESS: Remove the successfully sent item from the chronological pipeline - retry_queue.popleft() - if not item.synthetic: - core_queue.task_done() - - -def _logging_handler_thread(state, logger=logger): - """ - Worker thread that drains messages and guarantees strict transport-level delivery - by checking stateless item origins packed inside RetryItem containers. - - Independent worker thread routine responsible for draining log messages - from the synchronized queue and streaming them to the remote hat-syslog endpoint. - - Uses an internal thread-local retry queue to maintain precise chronological - ordering of log messages during transport connection failures. - - Args: - state (_ThreadState): Thread-safe tracking state containing connection params. - logger (logging.Logger): Diagnostic logger instance for transport anomalies. - """ - ctx = None - if common.CommType.TLS == state.comm_type: - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ctx.check_hostname = False - ctx.verify_mode = ssl.VerifyMode.CERT_NONE - - # Chronological staging queue dedicated to maintaining strict FIFO order during outages - retry_queue = collections.deque() - - # Loop persists on shutdown until retry_queue is fully empty. - while retry_queue or not state.closed.is_set(): - # connect to the endpoint - s = None - try: - factory = _SOCKET_FACTORIES[state.comm_type] - s = factory(state, ctx) - except KeyError: - raise NotImplementedError(f'Unsupported comm_type: {state.comm_type}') - except Exception: - if s is not None: - with contextlib.suppress(Exception): - s.close() - time.sleep(state.reconnect_delay) - continue - - # Connection successfully established; enter message transmission loop - # while True optimizes throughput and relies on socket exceptions to break the loop context - try: - captured_drops = () - drop_payload = None - item = None - msg = None - msg_bytes = None - - while True: - # Harvest and process dropped counts - # SAFELY EXTRACT AND PROCESS ALL TRACKED OVERFLOWS CHRONOLOGICALLY - try: - with state.cv: - if 1 < len(state.dropped) or 0 < state.dropped[0]: - # Freeze and copy the entire sequence array out of shared memory - captured_drops = tuple(state.dropped) - # Re-instantiate the tracked array to a clean initial state instantly - state.dropped.clear() - state.dropped.append(0) - - # Convert captured thresholds into synthetic inline logs inside our local staging worker - for chunked_count in captured_drops: - if 0 < chunked_count: - drop_payload = _create_dropped_msg( - chunked_count, '_logging_handler_thread', 0, - ) - # Appending to the retry queue guarantees strict chronological reporting order - retry_queue.append(RetryItem(synthetic=True, msg=drop_payload)) - finally: - captured_drops = () - drop_payload = None - - # Grab from the main queue if the local transport staging queue is empty - # If the retry queue is empty, block and wait for a fresh log message - if not retry_queue: - try: - msg = state.queue.get(timeout=state.reconnect_delay) - except queue.Empty: - if state.closed.is_set(): - # The queue is empty and the handler has been explicitly closed. - # Break the transmission loop to allow the worker thread to exit cleanly. - break - continue - else: - retry_queue.append(RetryItem(synthetic=False, msg=msg)) - finally: - msg = None - - # Transmit head message and track task lifecycle states - try: - # Peek at the oldest message without popping it yet - item = retry_queue[0] - msg_bytes = encoder.msg_to_str(item.msg).encode() - - if common.CommType.UDP == state.comm_type: - s.send(msg_bytes) - else: - s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) - except (TypeError, ValueError): - # Message structure failure: Drop item immediately and preserve connection state - logger.exception('Dropping un-convertible poison-pill log message') - _item_completed(retry_queue, state.queue, item) - except UnicodeEncodeError: - # String binary processing failure: Drop item immediately and preserve connection state - logger.exception('Dropping un-encodable Unicode log message string') - _item_completed(retry_queue, state.queue, item) - except Exception: - # On socket break, tear down this loop context cleanly. - # The current message remains cleanly preserved at index 0 of retry_queue. - # Because task_done() is skipped, state.queue.join() will continue to block. - # Connection lost or infrastructure network failure: Break out loop to trigger socket reconnect - # Element stays safe at index 0 of the retry_queue cache. - # After reconnecting we will try it again. - break - else: - _item_completed(retry_queue, state.queue, item) - finally: - # Clear references to optimize memory tracking - item = None - msg_bytes = None - finally: - # close the connection to avoid leaking it - with contextlib.suppress(Exception): - s.close() - - -class SyslogHandler(hat_syslog_handler_SyslogHandler): - """ - A process-safe wrapper for hat.syslog.handler.SyslogHandler. - - Bypasses immutable NamedTuple state constraints on fork boundaries, - avoids thread-lock corruption from os.fork(), and short-circuits - the time-blocking flush/close loops during a Huey graceful shutdown. - - A process-safe wrapper subclass for hat.syslog.handler.SyslogHandler. - - Bypasses immutable state limitations on fork boundaries, handles thread-lock - corruption resulting from os.fork(), and avoids blocking task-execution loops - during a graceful worker process shutdown. - """ - def __init__(self, host, port, comm_type, queue_size=1024, reconnect_delay=5, *args, **kwargs): - """Initializes the wrapper and neutralizes conflicting parent process states.""" - super().__init__(host, port, common.CommType.UDP, queue_size, reconnect_delay, *args, **kwargs) - - state = self._get_parent_attr('__state') - - if state: - state.closed.set() - thread = self._get_parent_attr('__thread') - if thread and thread.is_alive(): - with state.cv, contextlib.suppress(Exception): - state.cv.notify_all() - - # Initialize native, thread-safe sync tracking structures - self.__state = _ThreadState( - host=host, - port=port, - comm_type=self._determine_comm_type(comm_type), - queue=queue.Queue(maxsize=queue_size), - queue_size=queue_size, - reconnect_delay=reconnect_delay, - cv=threading.Condition(), - closed=threading.Event(), - dropped=list((0,)), - ) - - self.__thread = None - self._initial_pid = os.getpid() - self._closing = threading.Event() - - def _alive_thread(self): - """Thread-safe validator confirming background worker availability.""" - if not (self.__thread and self.__thread.is_alive()): - return False - - with self.__state.cv: - if self.__thread and self.__thread.is_alive(): - if hasattr(self.__thread, '_scoreboard') and self.__thread._scoreboard: - self.__thread._scoreboard.alive = (time.time(), time.monotonic_ns()) - return True - return False - - def _after_fork(self, current_pid=None): - """ - Intercepts the Unix process boundary skew. If a fork is identified, - it cleans old references and initializes fresh process-isolated primitives. - """ - - # Detect if we crossed the Unix fork boundary into Huey's process worker. - if current_pid is None: - current_pid = os.getpid() - - if current_pid == self._initial_pid: - # Return early when we have not forked. - return - - self._initial_pid = current_pid - - state = self.__state - new_state = _ThreadState( - host=state.host, - port=state.port, - comm_type=state.comm_type, - queue=queue.Queue(maxsize=state.queue_size), - queue_size=state.queue_size, - reconnect_delay=state.reconnect_delay, - cv=threading.Condition(), - closed=threading.Event(), - dropped=list((0,)), - ) - self.__state = new_state - self.__thread = None - - def _create_thread(self): - """Aligns process states and builds a clean worker thread context.""" - - self._after_fork() - - if self._closing.is_set() or self.__state.closed.is_set() or self._alive_thread(): - return - - initial = self.__thread is None - with self.__state.cv: - previous = self.__thread - - self.__thread = threading.Thread( - target=_logging_handler_thread, - args=(self.__state,), - daemon=True, - ) - - scoreboard = ThreadScoreboard() - self.__thread._scoreboard = scoreboard - - if initial: - scoreboard.alive = scoreboard.start - scoreboard.initialized = scoreboard.start - elif previous and hasattr(previous, '_scoreboard') and previous._scoreboard: - scoreboard.alive = previous._scoreboard.alive - scoreboard.initialized = previous._scoreboard.initialized - scoreboard.previous_start = previous._scoreboard.start - logger.debug(f'Created a replacement thread: {scoreboard=}') - - self.__thread.start() - - def _determine_comm_type(self, comm_type): - """Maps and cross-checks string connection descriptions to Enumeration definitions.""" - if isinstance(comm_type, str): - needle = comm_type - haystack = frozenset(common.CommType.__members__) - vary = lambda x: { - x, x.upper(), - x.casefold(), x.casefold().upper(), - x.lower(), x.lower().upper(), - } - try: - matched_elements = tuple(haystack.intersection(vary(needle))) - member = matched_elements[0] - return common.CommType[member] - except (IndexError, KeyError) as e: - raise ValueError(f'Specify a valid comm_type from this list: {list(haystack)}') from e - - if not isinstance(comm_type, common.CommType): - raise ValueError('Invalid comm_type argument') - - def _parent_class_name(self): - return hat_syslog_handler_SyslogHandler.__name__ - - def _mangled_name(self, attr_name): - return f'_{self._parent_class_name()}{attr_name}' - - def _get_parent_attr(self, attr_name): - """Computes and gets mangled attributes from the super class.""" - return getattr(self, self._mangled_name(attr_name), None) - - def _set_parent_attr(self, attr_name, value): - """Computes and sets mangled attributes on the super class.""" - setattr(self, self._mangled_name(attr_name), value) - - def emit(self, record): - """Enqueues new log records and guarantees active connection coverage.""" - if self._closing.is_set(): - with contextlib.suppress(Exception): - logger.handle(record) - return - - self._create_thread() - - if not self._alive_thread(): - with contextlib.suppress(Exception): - logger.handle(record) - return - - state = self.__state - if state.closed.is_set(): - self._closing.set() - logger.warning('Closed in emit') - with contextlib.suppress(Exception): - logger.handle(record) - return - - msg = _record_to_msg(record) - - try: - state.queue.put_nowait(msg) - except queue.Full: - # ACQUIRE LOCK ON MAIN THREAD BEFORE INCREMENTING COUNTER SLICES - with state.cv: - dropped_count = state.dropped[-1] - if 1_000_000 < dropped_count: - state.dropped.append(1) - else: - state.dropped[-1] = 1 + dropped_count - - logger.warning(f'Dropped a log message in emit due to buffer overflow: {msg.msg!r}') - with contextlib.suppress(Exception): - logger.handle(record) - - def flush(self): - """Blocks execution until the internal logging queue is empty.""" - self._create_thread() - - if not self._alive_thread(): - return - - state = self.__state - with contextlib.suppress(Exception): - state.queue.join() - - def close(self): - """ - Gracefully flushes the queue and terminates the background logging thread. - Cleans up the queue, flags the state as closed, and shuts down - the background thread without allowing new ones to be generated. - """ - - # Align/verify the background worker thread state immediately - self._closing.clear() - self._create_thread() - state = self.__state - if state.closed.is_set(): - # Only return early when the thread is alive - state.closed.clear() - self._create_thread() - self._closing.set() - - # The native queue.join() blockade is now perfectly synchronized with the internal - # tracking flow loop. It will block until retry_queue is 100% empty. - if self._alive_thread(): - logger.debug('Flushing logging queue in close') - with contextlib.suppress(Exception): - state.queue.join() - - # Immediately trip the closed flag to end the networking thread - self.__state.closed.set() - - # ===================================================================== - # DYNAMIC GRANDPARENT BYPASS VIA MRO - # Instructs Python to search for close() starting *after* our direct - # parent class type descriptor. This dynamically resolves grandfather - # dependencies while completely avoiding the parent class thread-joins. - # ===================================================================== - super(hat_syslog_handler_SyslogHandler, self).close() - - -if '__main__' == __name__: - import unittest - - class MockSyslogServer: - """Stands up an isolated local background socket server to harvest transport streams.""" - def __init__(self, host='127.0.0.1', port=0): - self.host = host - self.port = port - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.sock.bind((self.host, self.port)) - self.port = self.sock.getsockname()[1] - - self.received_messages = [] - self.running = threading.Event() - self._thread = None - - def start(self): - self.running.set() - self._thread = threading.Thread(target=self._listen_loop, daemon=True) - self._thread.start() - - def stop(self): - self.running.clear() - s = None - try: - s = socket.create_connection((self.host, self.port), timeout=0.1) - except Exception: - pass - finally: - if s is not None: - s.close() - if self._thread: - self._thread.join(timeout=1.0) - self.sock.close() - - def _listen_loop(self): - self.sock.listen(1) - while self.running.is_set(): - try: - conn, _ = self.sock.accept() - if not self.running.is_set(): - conn.close() - break - - with conn: - while self.running.is_set(): - data = conn.recv(4096) - if not data: - break - self.received_messages.append(data.decode('utf-8')) - except Exception: - break - - class TestSyslogHandlerIntegration(unittest.TestCase): - def setUp(self): - """Initializes the background mock network collection service before running assertions.""" - self.server = MockSyslogServer() - self.server.start() - - self.handler = SyslogHandler( - host=self.server.host, - port=self.server.port, - comm_type='tcp', - queue_size=10, - reconnect_delay=1, - ) - - self.test_logger = logging.getLogger('integration_test') - self.test_logger.setLevel(logging.DEBUG) - self.test_logger.addHandler(self.handler) - - def tearDown(self): - """Cleans up the network service topology profiles safely upon validation teardown.""" - with contextlib.suppress(Exception): - self.handler.close() - self.server.stop() - - def test_pipeline_delivery_and_flush(self): - """Verifies that items are completely delivered down the wire before flush unblocks.""" - self.test_logger.debug('Message A') - self.test_logger.info('Message B') - - start_time = time.monotonic() - self.handler.flush() - elapsed = time.monotonic() - start_time - - self.assertLess(elapsed, 2.0, 'The flush operations deadlocked the execution loop context') - self.assertTrue(any('Message A' in msg for msg in self.server.received_messages)) - self.assertTrue(any('Message B' in msg for msg in self.server.received_messages)) - - def test_graceful_close_lifecycle(self): - """Confirms that close drains remaining log states and tears down the worker thread.""" - self.test_logger.info('Shutdown Message') - self.handler.close() - self.assertTrue(any('Shutdown Message' in msg for msg in self.server.received_messages)) - - unittest.main() - diff --git a/tubesync/common/json.py b/tubesync/common/json_encoder.py similarity index 100% rename from tubesync/common/json.py rename to tubesync/common/json_encoder.py diff --git a/tubesync/common/logger.py b/tubesync/common/logger.py index 12d77c253..d04d878e8 100644 --- a/tubesync/common/logger.py +++ b/tubesync/common/logger.py @@ -1,22 +1,32 @@ import logging from django.conf import settings -##from .logging import default_handler, syslog_handler -from .utils import getenv +from .logs import app_logger, default_handler +##from .logs.syslog.std import default_handler as syslog_default_handler +from .logs.syslog.hat import ( + default_handler as hat_syslog_default_handler, + handler as hat_syslog_handler, +) -##if settings.DEBUG: -## default_handler.setLevel(logging.DEBUG) +log = app_logger - -app_name = getenv('DJANGO_SETTINGS_MODULE') -first_part = app_name.split('.', 1)[0] -log = app_logger = logging.getLogger(first_part) -##app_logger.propagate = False -##app_logger.addHandler(default_handler) -##app_logger.addHandler(syslog_handler) -app_logger.setLevel(logging.INFO) +default_handler.setLevel(logging.INFO) if settings.DEBUG: - app_logger.setLevel(logging.DEBUG) + default_handler.setLevel(logging.DEBUG) + +hat_syslog_tcp_handler = hat_syslog_handler( + host=hat_syslog_default_handler.host, + port=hat_syslog_default_handler.port, + comm_type='TCP', +) +hat_syslog_tcp_handler.setLevel(logging.DEBUG) +##if not settings.DEBUG: +## hat_syslog_tcp_handler.setLevel(logging.INFO) + +app_logger.propagate = False +app_logger.addHandler(default_handler) +##app_logger.addHandler(syslog_default_handler) +app_logger.addHandler(hat_syslog_tcp_handler) if ( hasattr(settings, 'DATABASES') and diff --git a/tubesync/common/logs/__init__.py b/tubesync/common/logs/__init__.py new file mode 100644 index 000000000..f83eea48a --- /dev/null +++ b/tubesync/common/logs/__init__.py @@ -0,0 +1,16 @@ +from . import syslog +from ._default import default_formatter, default_handler +from ._filters import RemoveSpecificLogFilter +from ._logger import app_logger, logger + + +logger = logger(__name__) + +__all__ = [ + 'app_logger', + 'default_formatter', + 'default_handler', + 'logger', + 'syslog', + 'RemoveSpecificLogFilter', +] diff --git a/tubesync/common/logs/_default.py b/tubesync/common/logs/_default.py new file mode 100644 index 000000000..d309f55ec --- /dev/null +++ b/tubesync/common/logs/_default.py @@ -0,0 +1,12 @@ +import logging + + +default_formatter = logging.Formatter( + '%(asctime)s [%(name)s/%(levelname)s] %(message)s' +) + +default_handler = logging.StreamHandler() +default_handler.setFormatter(default_formatter) +default_handler.setLevel(logging.INFO) + +__all__ = ['default_formatter', 'default_handler'] diff --git a/tubesync/common/logging.py b/tubesync/common/logs/_filters.py similarity index 78% rename from tubesync/common/logging.py rename to tubesync/common/logs/_filters.py index 0b73c29a5..1084e7997 100644 --- a/tubesync/common/logging.py +++ b/tubesync/common/logs/_filters.py @@ -1,5 +1,4 @@ import logging -from logging.handlers import SysLogHandler class RemoveSpecificLogFilter(logging.Filter): @@ -69,21 +68,7 @@ def filter(self, record): return False -default_formatter = logging.Formatter( - '%(asctime)s [%(name)s/%(levelname)s] %(message)s' -) -default_handler = logging.StreamHandler() -default_handler.setFormatter(default_formatter) -default_handler.setLevel(logging.INFO) - -syslog_formatter = logging.Formatter( - '%(asctime)s %(name)s: %(message)s', - '%b %d %H:%M:%S', -) -syslog_handler = SysLogHandler( - address='/dev/log', - facility=SysLogHandler.LOG_LOCAL0, -) -syslog_handler.setFormatter(syslog_formatter) -syslog_handler.setLevel(logging.DEBUG) +__all__ = [ + 'RemoveSpecificLogFilter', +] diff --git a/tubesync/common/logs/_logger.py b/tubesync/common/logs/_logger.py new file mode 100644 index 000000000..1be5008bb --- /dev/null +++ b/tubesync/common/logs/_logger.py @@ -0,0 +1,16 @@ +import logging +import os + + +logger = lambda name=None: logging.getLogger( + __name__.rsplit('.', 1)[0] if name is None else name +) + +app_logger = logger() +app_name = os.getenv('DJANGO_SETTINGS_MODULE', str()).strip() +if app_name: + first_part = app_name.split('.', 1)[0] + app_logger = logger(first_part) +app_logger.setLevel(logging.DEBUG) + +__all__ = ['app_logger', 'logger'] diff --git a/tubesync/common/logs/syslog/__init__.py b/tubesync/common/logs/syslog/__init__.py new file mode 100644 index 000000000..3a65e0707 --- /dev/null +++ b/tubesync/common/logs/syslog/__init__.py @@ -0,0 +1,2 @@ +from . import hat as hat +from . import std as std diff --git a/tubesync/common/logs/syslog/hat/__init__.py b/tubesync/common/logs/syslog/hat/__init__.py new file mode 100644 index 000000000..57f8134da --- /dev/null +++ b/tubesync/common/logs/syslog/hat/__init__.py @@ -0,0 +1,3 @@ +from ._default import * # noqa: F403 +from ._logger import logger as logger +logger = logger(__name__) diff --git a/tubesync/common/logs/syslog/hat/__main__.py b/tubesync/common/logs/syslog/hat/__main__.py new file mode 100644 index 000000000..2f0bd740d --- /dev/null +++ b/tubesync/common/logs/syslog/hat/__main__.py @@ -0,0 +1,106 @@ +import contextlib +import logging +import socket +import threading +import time +import unittest + +from ._default import handler + + +class MockSyslogServer: + """Stands up an isolated local background socket server to harvest transport streams.""" + def __init__(self, host='127.0.0.1', port=0): + self.host = host + self.port = port + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind((self.host, self.port)) + self.port = self.sock.getsockname()[1] + + self.received_messages = [] + self.running = threading.Event() + self._thread = None + + def start(self): + self.running.set() + self._thread = threading.Thread(target=self._listen_loop, daemon=True) + self._thread.start() + + def stop(self): + self.running.clear() + s = None + try: + s = socket.create_connection((self.host, self.port), timeout=0.1) + except Exception: + pass + finally: + if s is not None: + s.close() + if self._thread: + self._thread.join(timeout=1.0) + self.sock.close() + + def _listen_loop(self): + self.sock.listen(1) + while self.running.is_set(): + try: + conn, _ = self.sock.accept() + if not self.running.is_set(): + conn.close() + break + + with conn: + while self.running.is_set(): + data = conn.recv(4096) + if not data: + break + self.received_messages.append(data.decode('utf-8')) + except Exception: + break + +class TestSyslogHandlerIntegration(unittest.TestCase): + def setUp(self): + """Initializes the background mock network collection service before running assertions.""" + self.server = MockSyslogServer() + self.server.start() + + self.handler = handler( + host=self.server.host, + port=self.server.port, + comm_type='tcp', + queue_size=10, + reconnect_delay=1, + ) + + self.test_logger = logging.getLogger('integration_test') + self.test_logger.setLevel(logging.DEBUG) + self.test_logger.addHandler(self.handler) + + def tearDown(self): + """Cleans up the network service topology profiles safely upon validation teardown.""" + with contextlib.suppress(Exception): + self.handler.close() + self.server.stop() + + def test_pipeline_delivery_and_flush(self): + """Verifies that items are completely delivered down the wire before flush unblocks.""" + self.test_logger.debug('Message A') + self.test_logger.info('Message B') + + start_time = time.monotonic() + self.handler.flush() + elapsed = time.monotonic() - start_time + + self.assertLess(elapsed, 2.0, 'The flush operations deadlocked the execution loop context') + self.assertTrue(any('Message A' in msg for msg in self.server.received_messages)) + self.assertTrue(any('Message B' in msg for msg in self.server.received_messages)) + + def test_graceful_close_lifecycle(self): + """Confirms that close drains remaining log states and tears down the worker thread.""" + self.test_logger.info('Shutdown Message') + self.handler.close() + self.assertTrue(any('Shutdown Message' in msg for msg in self.server.received_messages)) + +unittest.main() diff --git a/tubesync/common/logs/syslog/hat/_default.py b/tubesync/common/logs/syslog/hat/_default.py new file mode 100644 index 000000000..a5f335d21 --- /dev/null +++ b/tubesync/common/logs/syslog/hat/_default.py @@ -0,0 +1,481 @@ +import collections +import contextlib +from dataclasses import dataclass, field +import logging +import os +import queue +import socket +import ssl +import threading +import time + +from typing import Optional, Tuple + +from ._logger import logger + +try: + from hat.syslog import common, encoder + from hat.syslog.handler import ( + SyslogHandler as hat_syslog_handler_SyslogHandler, + _ThreadState, + _create_dropped_msg, + _record_to_msg, + ) +except ImportError: + handler = None +else: + handler = True + +default_formatter = logging.Formatter() + +logger = logger() + +__all__ = ['default_formatter', 'default_handler', 'handler'] + +if not handler: + # Create only enough for tests to fail instead of creating hard to diagnose errors + from ..std import default_handler as std_default_handler, handler + class MockSyslogHandler(handler): + def __init__(self, host, port, comm_type, queue_size, reconnect_delay, *args, **kwargs): + self.host = host + self.port = port + self.comm_type = comm_type + self.queue_size = queue_size + self.reconnect_delay = reconnect_delay + args = () + kwargs = {} + kwargs['address'] = std_default_handler.address + kwargs['facility'] = std_default_handler.facility + super().__init__(*args, **kwargs) + handler = MockSyslogHandler + default_handler = handler('127.0.0.1', 6514, 'UDP', 1024, 5) +else: + _SOCKET_FACTORIES = { + common.CommType.TCP: lambda state, ctx: _create_tcp_socket(state), + common.CommType.TLS: lambda state, ctx: _create_tcp_socket(state, ctx), + common.CommType.UDP: lambda state, ctx: _create_udp_socket(state), + } + + + @dataclass(frozen=True) + class RetryItem: + """Encapsulates a structured syslog entry in the retry transport pipeline.""" + synthetic: bool + msg: common.Msg + + + @dataclass + class ThreadScoreboard: + """Tracks precision execution lifecycles and diagnostic markers for background workers.""" + start: Tuple[float, int] = field(default_factory=lambda: (time.time(), time.monotonic_ns())) + alive: Optional[Tuple[float, int]] = None + initialized: Optional[Tuple[float, int]] = None + previous_start: Optional[Tuple[float, int]] = None + + + def _create_tcp_socket(state, ctx=None): + """Establishes an optimized TCP or wrapped TLS stream transport connection.""" + s = socket.create_connection((state.host, state.port), timeout=5.0) + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + if ctx: + s = ctx.wrap_socket(s) + return s + + + def _create_udp_socket(state): + """Establishes an un-bonded UDP datagram socket connection endpoint.""" + s = socket.socket(type=socket.SOCK_DGRAM) + s.connect((state.host, state.port)) + return s + + + def _item_completed(retry_queue, core_queue, item): + """ + Drains the processed item from the retry queue and issues a task acknowledgment + to the synchronized queue if the item was not synthetically created. + """ + # SUCCESS: Remove the successfully sent item from the chronological pipeline + retry_queue.popleft() + if not item.synthetic: + core_queue.task_done() + + + def _logging_handler_thread(state, logger=logger): + """ + Worker thread that drains messages and guarantees strict transport-level delivery + by checking stateless item origins packed inside RetryItem containers. + + Independent worker thread routine responsible for draining log messages + from the synchronized queue and streaming them to the remote hat-syslog endpoint. + + Uses an internal thread-local retry queue to maintain precise chronological + ordering of log messages during transport connection failures. + + Args: + state (_ThreadState): Thread-safe tracking state containing connection params. + logger (logging.Logger): Diagnostic logger instance for transport anomalies. + """ + ctx = None + if common.CommType.TLS == state.comm_type: + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.VerifyMode.CERT_NONE + + # Chronological staging queue dedicated to maintaining strict FIFO order during outages + retry_queue = collections.deque() + + # Loop persists on shutdown until retry_queue is fully empty. + while retry_queue or not state.closed.is_set(): + # connect to the endpoint + s = None + try: + factory = _SOCKET_FACTORIES[state.comm_type] + s = factory(state, ctx) + except KeyError: + raise NotImplementedError(f'Unsupported comm_type: {state.comm_type}') + except Exception: + if s is not None: + with contextlib.suppress(Exception): + s.close() + time.sleep(state.reconnect_delay) + continue + + # Connection successfully established; enter message transmission loop + # while True optimizes throughput and relies on socket exceptions to break the loop context + try: + captured_drops = () + drop_payload = None + item = None + msg = None + msg_bytes = None + + while True: + # Harvest and process dropped counts + # SAFELY EXTRACT AND PROCESS ALL TRACKED OVERFLOWS CHRONOLOGICALLY + try: + with state.cv: + if 1 < len(state.dropped) or 0 < state.dropped[0]: + # Freeze and copy the entire sequence array out of shared memory + captured_drops = tuple(state.dropped) + # Re-instantiate the tracked array to a clean initial state instantly + state.dropped.clear() + state.dropped.append(0) + + # Convert captured thresholds into synthetic inline logs inside our local staging worker + for chunked_count in captured_drops: + if 0 < chunked_count: + drop_payload = _create_dropped_msg( + chunked_count, '_logging_handler_thread', 0, + ) + # Appending to the retry queue guarantees strict chronological reporting order + retry_queue.append(RetryItem(synthetic=True, msg=drop_payload)) + finally: + captured_drops = () + drop_payload = None + + # Grab from the main queue if the local transport staging queue is empty + # If the retry queue is empty, block and wait for a fresh log message + if not retry_queue: + try: + msg = state.queue.get(timeout=state.reconnect_delay) + except queue.Empty: + if state.closed.is_set(): + # The queue is empty and the handler has been explicitly closed. + # Break the transmission loop to allow the worker thread to exit cleanly. + break + continue + else: + retry_queue.append(RetryItem(synthetic=False, msg=msg)) + finally: + msg = None + + # Transmit head message and track task lifecycle states + try: + # Peek at the oldest message without popping it yet + item = retry_queue[0] + msg_bytes = encoder.msg_to_str(item.msg).encode() + + if common.CommType.UDP == state.comm_type: + s.send(msg_bytes) + else: + s.send(f'{len(msg_bytes)} '.encode() + msg_bytes) + except (TypeError, ValueError): + # Message structure failure: Drop item immediately and preserve connection state + logger.exception('Dropping un-convertible poison-pill log message') + _item_completed(retry_queue, state.queue, item) + except UnicodeEncodeError: + # String binary processing failure: Drop item immediately and preserve connection state + logger.exception('Dropping un-encodable Unicode log message string') + _item_completed(retry_queue, state.queue, item) + except Exception: + # On socket break, tear down this loop context cleanly. + # The current message remains cleanly preserved at index 0 of retry_queue. + # Because task_done() is skipped, state.queue.join() will continue to block. + # Connection lost or infrastructure network failure: Break out loop to trigger socket reconnect + # Element stays safe at index 0 of the retry_queue cache. + # After reconnecting we will try it again. + break + else: + _item_completed(retry_queue, state.queue, item) + finally: + # Clear references to optimize memory tracking + item = None + msg_bytes = None + finally: + # close the connection to avoid leaking it + with contextlib.suppress(Exception): + s.close() + + + class SyslogHandler(hat_syslog_handler_SyslogHandler): + """ + A process-safe wrapper for hat.syslog.handler.SyslogHandler. + + Bypasses immutable NamedTuple state constraints on fork boundaries, + avoids thread-lock corruption from os.fork(), and short-circuits + the time-blocking flush/close loops during a Huey graceful shutdown. + + A process-safe wrapper subclass for hat.syslog.handler.SyslogHandler. + + Bypasses immutable state limitations on fork boundaries, handles thread-lock + corruption resulting from os.fork(), and avoids blocking task-execution loops + during a graceful worker process shutdown. + """ + def __init__(self, host, port, comm_type, queue_size=1024, reconnect_delay=5, *args, **kwargs): + """Initializes the wrapper and neutralizes conflicting parent process states.""" + super().__init__(host, port, common.CommType.UDP, queue_size, reconnect_delay, *args, **kwargs) + + state = self._get_parent_attr('__state') + + if state: + state.closed.set() + thread = self._get_parent_attr('__thread') + if thread and thread.is_alive(): + with state.cv, contextlib.suppress(Exception): + state.cv.notify_all() + + # Initialize native, thread-safe sync tracking structures + self.__state = _ThreadState( + host=host, + port=port, + comm_type=self._determine_comm_type(comm_type), + queue=queue.Queue(maxsize=queue_size), + queue_size=queue_size, + reconnect_delay=reconnect_delay, + cv=threading.Condition(), + closed=threading.Event(), + dropped=list((0,)), + ) + + self.__thread = None + self._initial_pid = os.getpid() + self._closing = threading.Event() + + # Save the original arguments + self.host = host + self.port = port + self.comm_type = comm_type + self.queue_size = queue_size + self.reconnect_delay = reconnect_delay + + def _alive_thread(self): + """Thread-safe validator confirming background worker availability.""" + if not (self.__thread and self.__thread.is_alive()): + return False + + with self.__state.cv: + if self.__thread and self.__thread.is_alive(): + if hasattr(self.__thread, '_scoreboard') and self.__thread._scoreboard: + self.__thread._scoreboard.alive = (time.time(), time.monotonic_ns()) + return True + return False + + def _after_fork(self, current_pid=None): + """ + Intercepts the Unix process boundary skew. If a fork is identified, + it cleans old references and initializes fresh process-isolated primitives. + """ + + # Detect if we crossed the Unix fork boundary into Huey's process worker. + if current_pid is None: + current_pid = os.getpid() + + if current_pid == self._initial_pid: + # Return early when we have not forked. + return + + self._initial_pid = current_pid + + state = self.__state + new_state = _ThreadState( + host=state.host, + port=state.port, + comm_type=state.comm_type, + queue=queue.Queue(maxsize=state.queue_size), + queue_size=state.queue_size, + reconnect_delay=state.reconnect_delay, + cv=threading.Condition(), + closed=threading.Event(), + dropped=list((0,)), + ) + self.__state = new_state + self.__thread = None + + def _create_thread(self): + """Aligns process states and builds a clean worker thread context.""" + + self._after_fork() + + if self._closing.is_set() or self.__state.closed.is_set() or self._alive_thread(): + return + + initial = self.__thread is None + with self.__state.cv: + previous = self.__thread + + self.__thread = threading.Thread( + target=_logging_handler_thread, + args=(self.__state,), + daemon=True, + ) + + scoreboard = ThreadScoreboard() + self.__thread._scoreboard = scoreboard + + if initial: + scoreboard.alive = scoreboard.start + scoreboard.initialized = scoreboard.start + elif previous and hasattr(previous, '_scoreboard') and previous._scoreboard: + scoreboard.alive = previous._scoreboard.alive + scoreboard.initialized = previous._scoreboard.initialized + scoreboard.previous_start = previous._scoreboard.start + logger.debug(f'Created a replacement thread: {scoreboard=}') + + self.__thread.start() + + def _determine_comm_type(self, comm_type): + """Maps and cross-checks string connection descriptions to Enumeration definitions.""" + if isinstance(comm_type, str): + needle = comm_type + haystack = frozenset(common.CommType.__members__) + vary = lambda x: { + x, x.upper(), + x.casefold(), x.casefold().upper(), + x.lower(), x.lower().upper(), + } + try: + matched_elements = tuple(haystack.intersection(vary(needle))) + member = matched_elements[0] + return common.CommType[member] + except (IndexError, KeyError) as e: + raise ValueError(f'Specify a valid comm_type from this list: {list(haystack)}') from e + + if not isinstance(comm_type, common.CommType): + raise ValueError('Invalid comm_type argument') + + def _parent_class_name(self): + return hat_syslog_handler_SyslogHandler.__name__ + + def _mangled_name(self, attr_name): + return f'_{self._parent_class_name()}{attr_name}' + + def _get_parent_attr(self, attr_name): + """Computes and gets mangled attributes from the super class.""" + return getattr(self, self._mangled_name(attr_name), None) + + def _set_parent_attr(self, attr_name, value): + """Computes and sets mangled attributes on the super class.""" + setattr(self, self._mangled_name(attr_name), value) + + def emit(self, record): + """Enqueues new log records and guarantees active connection coverage.""" + if self._closing.is_set(): + with contextlib.suppress(Exception): + logger.handle(record) + return + + self._create_thread() + + if not self._alive_thread(): + with contextlib.suppress(Exception): + logger.handle(record) + return + + state = self.__state + if state.closed.is_set(): + self._closing.set() + logger.warning('Closed in emit') + with contextlib.suppress(Exception): + logger.handle(record) + return + + msg = _record_to_msg(record) + + try: + state.queue.put_nowait(msg) + except queue.Full: + # ACQUIRE LOCK ON MAIN THREAD BEFORE INCREMENTING COUNTER SLICES + with state.cv: + dropped_count = state.dropped[-1] + if 1_000_000 < dropped_count: + state.dropped.append(1) + else: + state.dropped[-1] = 1 + dropped_count + + logger.warning(f'Dropped a log message in emit due to buffer overflow: {msg.msg!r}') + with contextlib.suppress(Exception): + logger.handle(record) + + def flush(self): + """Blocks execution until the internal logging queue is empty.""" + self._create_thread() + + if not self._alive_thread(): + return + + state = self.__state + with contextlib.suppress(Exception): + state.queue.join() + + def close(self): + """ + Gracefully flushes the queue and terminates the background logging thread. + Cleans up the queue, flags the state as closed, and shuts down + the background thread without allowing new ones to be generated. + """ + + # Align/verify the background worker thread state immediately + self._closing.clear() + self._create_thread() + state = self.__state + if state.closed.is_set(): + # Only return early when the thread is alive + state.closed.clear() + self._create_thread() + self._closing.set() + + # The native queue.join() blockade is now perfectly synchronized with the internal + # tracking flow loop. It will block until retry_queue is 100% empty. + if self._alive_thread(): + logger.debug('Flushing logging queue in close') + with contextlib.suppress(Exception): + state.queue.join() + + # Immediately trip the closed flag to end the networking thread + self.__state.closed.set() + + # ===================================================================== + # DYNAMIC GRANDPARENT BYPASS VIA MRO + # Instructs Python to search for close() starting *after* our direct + # parent class type descriptor. This dynamically resolves grandfather + # dependencies while completely avoiding the parent class thread-joins. + # ===================================================================== + super(hat_syslog_handler_SyslogHandler, self).close() + + + handler = SyslogHandler + default_handler = handler( + comm_type='UDP', + host='127.0.0.1', + port=6514, + ) diff --git a/tubesync/common/logs/syslog/hat/_logger.py b/tubesync/common/logs/syslog/hat/_logger.py new file mode 100644 index 000000000..6e97ec439 --- /dev/null +++ b/tubesync/common/logs/syslog/hat/_logger.py @@ -0,0 +1,8 @@ +import logging + + +logger = lambda name=None: logging.getLogger( + __name__.rsplit('.', 1)[0] if name is None else name +) + +__all__ = ['logger'] diff --git a/tubesync/common/logs/syslog/std/__init__.py b/tubesync/common/logs/syslog/std/__init__.py new file mode 100644 index 000000000..57f8134da --- /dev/null +++ b/tubesync/common/logs/syslog/std/__init__.py @@ -0,0 +1,3 @@ +from ._default import * # noqa: F403 +from ._logger import logger as logger +logger = logger(__name__) diff --git a/tubesync/common/logs/syslog/std/_default.py b/tubesync/common/logs/syslog/std/_default.py new file mode 100644 index 000000000..e785fc330 --- /dev/null +++ b/tubesync/common/logs/syslog/std/_default.py @@ -0,0 +1,19 @@ +import logging +from logging.handlers import SysLogHandler + +handler = SysLogHandler +facility = SysLogHandler.LOG_LOCAL0 + +default_formatter = logging.Formatter( + '%(asctime)s %(name)s: %(message)s', + '%b %d %H:%M:%S', +) + +default_handler = handler( + address='/dev/log', + facility=facility, +) +default_handler.setFormatter(default_formatter) +default_handler.setLevel(logging.DEBUG) + +__all__ = ['default_formatter', 'default_handler', 'handler'] diff --git a/tubesync/common/logs/syslog/std/_logger.py b/tubesync/common/logs/syslog/std/_logger.py new file mode 100644 index 000000000..6e97ec439 --- /dev/null +++ b/tubesync/common/logs/syslog/std/_logger.py @@ -0,0 +1,8 @@ +import logging + + +logger = lambda name=None: logging.getLogger( + __name__.rsplit('.', 1)[0] if name is None else name +) + +__all__ = ['logger'] diff --git a/tubesync/common/migrations/0001_initial.py b/tubesync/common/migrations/0001_initial.py index 8bfe640d4..bae70beac 100644 --- a/tubesync/common/migrations/0001_initial.py +++ b/tubesync/common/migrations/0001_initial.py @@ -1,7 +1,7 @@ # Generated by Django 5.2.3 on 2025-06-30 07:11 -import common.json import django.utils.timezone +from common.json_encoder import JSONEncoder from django.db import migrations, models @@ -19,7 +19,7 @@ class Migration(migrations.Migration): ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('name', models.CharField(db_index=True, max_length=190)), ('task_id', models.CharField(db_index=True, max_length=40)), - ('task_params', models.JSONField(default=dict, encoder=common.json.JSONEncoder)), + ('task_params', models.JSONField(default=dict, encoder=JSONEncoder)), ('verbose_name', models.CharField(blank=True, max_length=255, null=True)), ('start_at', models.DateTimeField(blank=True, db_index=True, null=True)), ('end_at', models.DateTimeField(db_index=True, default=django.utils.timezone.now)), diff --git a/tubesync/common/models/tasks.py b/tubesync/common/models/tasks.py index b5dedf490..aa7154013 100644 --- a/tubesync/common/models/tasks.py +++ b/tubesync/common/models/tasks.py @@ -5,8 +5,8 @@ from django.db import connection, models, transaction from django.utils import timezone -from ..json import JSONEncoder -# from common.json import JSONEncoder +from ..json_encoder import JSONEncoder +# from common.json_encoder import JSONEncoder from ..utils import is_empty_iterator #from common.utils import is_empty_iterator diff --git a/tubesync/sync/management/commands/youtube-add-subscriptions.py b/tubesync/sync/management/commands/youtube-add-subscriptions.py index 17af85823..5be54429e 100644 --- a/tubesync/sync/management/commands/youtube-add-subscriptions.py +++ b/tubesync/sync/management/commands/youtube-add-subscriptions.py @@ -1,7 +1,7 @@ import json import urllib from django.core.management.base import BaseCommand, CommandError # noqa -from common.json import JSONEncoder +from common.json_encoder import JSONEncoder from sync.choices import Val, YouTube_SourceType # noqa from sync.models import Source diff --git a/tubesync/sync/management/commands/youtube-dl-info.py b/tubesync/sync/management/commands/youtube-dl-info.py index c8099765d..b00873244 100644 --- a/tubesync/sync/management/commands/youtube-dl-info.py +++ b/tubesync/sync/management/commands/youtube-dl-info.py @@ -1,7 +1,7 @@ import json from django.core.management.base import BaseCommand, CommandError # noqa from sync.youtube import get_media_info -from common.json import JSONEncoder +from common.json_encoder import JSONEncoder class Command(BaseCommand): diff --git a/tubesync/sync/migrations/0031_metadata_metadataformat.py b/tubesync/sync/migrations/0031_metadata_metadataformat.py index aee895182..932806c07 100644 --- a/tubesync/sync/migrations/0031_metadata_metadataformat.py +++ b/tubesync/sync/migrations/0031_metadata_metadataformat.py @@ -1,8 +1,8 @@ # Generated by Django 5.1.8 on 2025-04-11 07:36 -import common.json import django.db.models.deletion import uuid +from common.json_encoder import JSONEncoder from django.db import migrations, models @@ -23,7 +23,7 @@ class Migration(migrations.Migration): ('retrieved', models.DateTimeField(auto_now_add=True, db_index=True, help_text='Date and time the metadata was retrieved', verbose_name='retrieved')), ('uploaded', models.DateTimeField(help_text='Date and time the media was uploaded', null=True, verbose_name='uploaded')), ('published', models.DateTimeField(help_text='Date and time the media was published', null=True, verbose_name='published')), - ('value', models.JSONField(default=dict, encoder=common.json.JSONEncoder, help_text='JSON metadata object', verbose_name='value')), + ('value', models.JSONField(default=dict, encoder=JSONEncoder, help_text='JSON metadata object', verbose_name='value')), ('media', models.ForeignKey(help_text='Media the metadata belongs to', on_delete=django.db.models.deletion.CASCADE, related_name='metadata_media', to='sync.media')), ], options={ @@ -40,7 +40,7 @@ class Migration(migrations.Migration): ('key', models.CharField(blank=True, default='', help_text='Media identifier at the site for which this format is available', max_length=256, verbose_name='key')), ('number', models.PositiveIntegerField(help_text='Ordering number for this format', verbose_name='number')), ('code', models.CharField(blank=True, default='', help_text='Format identification code', max_length=64, verbose_name='code')), - ('value', models.JSONField(default=dict, encoder=common.json.JSONEncoder, help_text='JSON metadata format object', verbose_name='value')), + ('value', models.JSONField(default=dict, encoder=JSONEncoder, help_text='JSON metadata format object', verbose_name='value')), ('metadata', models.ForeignKey(help_text='Metadata the format belongs to', on_delete=django.db.models.deletion.CASCADE, related_name='metadataformat_metadata', to='sync.metadata')), ], options={ diff --git a/tubesync/sync/migrations/0031_squashed_metadata_metadataformat.py b/tubesync/sync/migrations/0031_squashed_metadata_metadataformat.py index c7a78bd81..3acde3a73 100644 --- a/tubesync/sync/migrations/0031_squashed_metadata_metadataformat.py +++ b/tubesync/sync/migrations/0031_squashed_metadata_metadataformat.py @@ -1,8 +1,8 @@ # Generated by Django 5.1.8 on 2025-04-23 18:10 -import common.json import django.db.models.deletion import uuid +from common.json_encoder import JSONEncoder from django.db import migrations, models @@ -25,7 +25,7 @@ class Migration(migrations.Migration): ('retrieved', models.DateTimeField(auto_now_add=True, db_index=True, help_text='Date and time the metadata was retrieved', verbose_name='retrieved')), ('uploaded', models.DateTimeField(db_index=True, help_text='Date and time the media was uploaded', null=True, verbose_name='uploaded')), ('published', models.DateTimeField(db_index=True, help_text='Date and time the media was published', null=True, verbose_name='published')), - ('value', models.JSONField(default=dict, encoder=common.json.JSONEncoder, help_text='JSON metadata object', verbose_name='value')), + ('value', models.JSONField(default=dict, encoder=JSONEncoder, help_text='JSON metadata object', verbose_name='value')), ('media', models.OneToOneField(help_text='Media the metadata belongs to', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='new_metadata', to='sync.media')), ], options={ @@ -43,7 +43,7 @@ class Migration(migrations.Migration): ('key', models.CharField(blank=True, db_index=True, default='', help_text='Media identifier at the site from which this format is available', max_length=256, verbose_name='key')), ('number', models.PositiveIntegerField(help_text='Ordering number for this format', verbose_name='number')), ('code', models.CharField(blank=True, default='', help_text='Format identification code', max_length=64, verbose_name='code')), - ('value', models.JSONField(default=dict, encoder=common.json.JSONEncoder, help_text='JSON metadata format object', verbose_name='value')), + ('value', models.JSONField(default=dict, encoder=JSONEncoder, help_text='JSON metadata format object', verbose_name='value')), ('metadata', models.ForeignKey(help_text='Metadata the format belongs to', on_delete=django.db.models.deletion.CASCADE, related_name='format', to='sync.metadata')), ], options={ diff --git a/tubesync/sync/migrations/0033_alter_mediaserver_options_alter_source_source_acodec_and_more.py b/tubesync/sync/migrations/0033_alter_mediaserver_options_alter_source_source_acodec_and_more.py index 46ea113f5..31e85cc1a 100644 --- a/tubesync/sync/migrations/0033_alter_mediaserver_options_alter_source_source_acodec_and_more.py +++ b/tubesync/sync/migrations/0033_alter_mediaserver_options_alter_source_source_acodec_and_more.py @@ -1,6 +1,6 @@ # Generated by Django 5.1.9 on 2025-05-10 06:18 -import common.json +from common.json_encoder import JSONEncoder from django.db import migrations, models @@ -14,7 +14,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='mediaserver', name='options', - field=models.JSONField(encoder=common.json.JSONEncoder, help_text='Options for the media server', null=True, verbose_name='options'), + field=models.JSONField(encoder=JSONEncoder, help_text='Options for the media server', null=True, verbose_name='options'), ), migrations.AlterField( model_name='source', diff --git a/tubesync/sync/models/media.py b/tubesync/sync/models/media.py index e4d288a5e..3f2a3d4a1 100644 --- a/tubesync/sync/models/media.py +++ b/tubesync/sync/models/media.py @@ -15,7 +15,7 @@ from django.utils.translation import gettext_lazy as _ from common.logger import log from common.errors import NoFormatException -from common.json import JSONEncoder +from common.json_encoder import JSONEncoder from common.utils import ( clean_filename, clean_emoji, directory_and_stem, glob_quote, mkdir_p, seconds_to_timestr, diff --git a/tubesync/sync/models/media_server.py b/tubesync/sync/models/media_server.py index 74502facf..7c4cea386 100644 --- a/tubesync/sync/models/media_server.py +++ b/tubesync/sync/models/media_server.py @@ -1,4 +1,4 @@ -from common.json import JSONEncoder +from common.json_encoder import JSONEncoder from django import db from django.utils.translation import gettext_lazy as _ from ..choices import Val, MediaServerType diff --git a/tubesync/sync/models/metadata.py b/tubesync/sync/models/metadata.py index 637f2d9a8..12a362efc 100644 --- a/tubesync/sync/models/metadata.py +++ b/tubesync/sync/models/metadata.py @@ -1,5 +1,5 @@ import uuid -from common.json import JSONEncoder +from common.json_encoder import JSONEncoder from common.timestamp import timestamp_to_datetime from common.utils import django_queryset_generator as qs_gen from django import db diff --git a/tubesync/sync/models/metadata_format.py b/tubesync/sync/models/metadata_format.py index c116575be..b10427414 100644 --- a/tubesync/sync/models/metadata_format.py +++ b/tubesync/sync/models/metadata_format.py @@ -1,5 +1,5 @@ import uuid -from common.json import JSONEncoder +from common.json_encoder import JSONEncoder from django import db from django.utils.translation import gettext_lazy as _ from .metadata import Metadata diff --git a/tubesync/tubesync/local_settings.py.container b/tubesync/tubesync/local_settings.py.container index 0800f055b..85d3e358f 100644 --- a/tubesync/tubesync/local_settings.py.container +++ b/tubesync/tubesync/local_settings.py.container @@ -1,5 +1,3 @@ -import logging -import sys from pathlib import Path from urllib.parse import urljoin from common.utils import getenv, parse_database_connection_string diff --git a/tubesync/tubesync/local_settings.py.example b/tubesync/tubesync/local_settings.py.example index 08dfabf68..3dc1f729d 100644 --- a/tubesync/tubesync/local_settings.py.example +++ b/tubesync/tubesync/local_settings.py.example @@ -1,4 +1,5 @@ from pathlib import Path +from common.logs import syslog BASE_DIR = Path(__file__).resolve().parent.parent @@ -6,9 +7,84 @@ CONFIG_BASE_DIR = BASE_DIR DOWNLOADS_BASE_DIR = BASE_DIR +DEBUG = False SECRET_KEY = 'example-secret-key' +LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'default': {}, + 'syslog': { + 'format': '%(asctime)s %(name)s: %(message)s', + 'datefmt': '%b %d %H:%M:%S', + }, + 'common': { + 'format': '%(asctime)s [%(name)s/%(levelname)s] %(message)s', + 'datefmt': None, + }, + 'consumer_simple': { + 'format': '%(asctime)s %(message)s', + 'datefmt': '%H:%M:%S', + }, + 'worker_process': { + 'format': '[%(asctime)s] %(levelname)s:%(name)s:%(process)d:%(message)s', + 'datefmt': None, + }, + 'worker_thread': { + 'format': '[%(asctime)s] %(levelname)s:%(name)s:%(process)d:%(threadName)s:%(message)s', + 'datefmt': None, + }, + }, + 'handlers': { + 'stderr': { + 'class': 'logging.StreamHandler', + 'level': 'DEBUG' if DEBUG else 'INFO', + 'formatter': 'common', + }, + 'stderr_worker_process': { + 'class': 'logging.StreamHandler', + 'level': 'INFO' if DEBUG else 'WARNING', + 'formatter': 'worker_process', + }, + 'stderr_worker_thread': { + 'class': 'logging.StreamHandler', + 'level': 'INFO' if DEBUG else 'WARNING', + 'formatter': 'worker_thread', + }, + 'syslog': { + 'class': syslog.std.handler, + 'address': syslog.std.default_handler.address, + 'facility': syslog.std.default_handler.facility, + 'level': 'DEBUG', + 'formatter': 'syslog', + }, + }, + 'root': { + 'handlers': ['syslog', 'stderr'], + 'level': 'DEBUG', + }, + 'loggers': { + 'huey': { + 'handlers': ['syslog', 'stderr_worker_thread'], + 'level': 'DEBUG', + 'propagate': False, + }, + 'huey.consumer.worker.process': { + 'handlers': ['syslog', 'stderr_worker_process'], + 'level': 'DEBUG', + 'propagate': False, + }, + 'huey.consumer.worker.thread': { + 'handlers': ['syslog', 'stderr_worker_thread'], + 'level': 'DEBUG', + 'propagate': False, + }, + }, +} + + DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', diff --git a/tubesync/tubesync/settings.py b/tubesync/tubesync/settings.py index dbbc05928..b4a31ba34 100644 --- a/tubesync/tubesync/settings.py +++ b/tubesync/tubesync/settings.py @@ -1,7 +1,7 @@ from django import VERSION as DJANGO_VERSION -from logging.handlers import SysLogHandler from pathlib import Path from common.huey import sqlite_tasks +from common.logs import syslog from common.utils import getenv from sync.choices import TaskQueue @@ -79,31 +79,31 @@ 'disable_existing_loggers': False, 'filters': { 'drop_huey_scheduler_checking_periodic_tasks': { - '()': 'common.logging.RemoveSpecificLogFilter', + '()': 'common.logs.RemoveSpecificLogFilter', 'func_name': 'enqueue_periodic_tasks', 'level': 'DEBUG', 'msg_starts_with': 'Checking periodic tasks', }, 'drop_huey_scheduler_sleep': { - '()': 'common.logging.RemoveSpecificLogFilter', + '()': 'common.logs.RemoveSpecificLogFilter', 'func_name': 'sleep_for_interval', 'level': 'DEBUG', 'msg_starts_with': 'Sleeping for ', }, 'drop_huey_scheduler_checking_worker_health': { - '()': 'common.logging.RemoveSpecificLogFilter', + '()': 'common.logs.RemoveSpecificLogFilter', 'func_name': 'check_worker_health', 'level': 'DEBUG', 'msg_starts_with': 'Checking worker health.', }, 'drop_huey_scheduler_scheduler_is_up': { - '()': 'common.logging.RemoveSpecificLogFilter', + '()': 'common.logs.RemoveSpecificLogFilter', 'func_name': 'check_worker_health', 'level': 'DEBUG', 'msg_starts_with': 'Scheduler is up and running.', }, 'drop_huey_scheduler_workers_are_up': { - '()': 'common.logging.RemoveSpecificLogFilter', + '()': 'common.logs.RemoveSpecificLogFilter', 'func_name': 'check_worker_health', 'level': 'DEBUG', 'msg_starts_with': 'Workers are up and running.', @@ -134,7 +134,7 @@ }, 'handlers': { 'hat_syslog': { - 'class': 'common.huey_syslog.SyslogHandler', + 'class': syslog.hat.handler, 'host': '127.0.0.1', 'port': 6514, 'comm_type': 'TCP', @@ -142,7 +142,7 @@ 'formatter': 'default', }, 'hat_syslog_worker_process': { - 'class': 'common.huey_syslog.SyslogHandler', + 'class': syslog.hat.handler, 'host': '127.0.0.1', 'port': 6514, 'comm_type': 'TCP', @@ -150,7 +150,7 @@ 'formatter': 'worker_process', }, 'hat_syslog_worker_thread': { - 'class': 'common.huey_syslog.SyslogHandler', + 'class': syslog.hat.handler, 'host': '127.0.0.1', 'port': 6514, 'comm_type': 'TCP', @@ -173,9 +173,9 @@ 'formatter': 'worker_thread', }, 'syslog': { - 'class': SysLogHandler, - 'address': '/dev/log', - 'facility': SysLogHandler.LOG_LOCAL0, + 'class': syslog.std.handler, + 'address': syslog.std.default_handler.address, + 'facility': syslog.std.default_handler.facility, 'level': 'DEBUG', 'formatter': 'syslog', }, @@ -185,7 +185,7 @@ 'level': 'DEBUG', }, 'loggers': { - 'common.huey_syslog': { + 'common.logs.syslog.hat': { 'handlers': ['syslog'], 'level': 'DEBUG', 'propagate': False,