From 06d9906e0829b48ee373fcc0ef3aba59e9f6693b Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil Date: Thu, 26 Feb 2026 13:19:03 +0100 Subject: [PATCH 1/5] fix(DISET): reject all connections during throttling to prevent queue growth The previous throttle mechanism accepted one connection every 0.25s even while the service was overloaded. When threads were stuck (e.g. blocked on DB queries or deadlocked), each accepted connection added to the already-full queue, making recovery impossible. Now all incoming connections are rejected while wantsThrottle is True, with a brief sleep to avoid busy-spinning. This prevents the self-reinforcing stuck state where the queue grows faster than it can drain. --- src/DIRAC/Core/DISET/ServiceReactor.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/DIRAC/Core/DISET/ServiceReactor.py b/src/DIRAC/Core/DISET/ServiceReactor.py index c3f7862be1a..f2aa01a7d26 100644 --- a/src/DIRAC/Core/DISET/ServiceReactor.py +++ b/src/DIRAC/Core/DISET/ServiceReactor.py @@ -200,7 +200,6 @@ def __acceptIncomingConnection(self, svcName=False): services at the same time """ sel = self.__getListeningSelector(svcName) - throttleExpires = None while self.__alive: clientTransport = None try: @@ -224,16 +223,13 @@ def __acceptIncomingConnection(self, svcName=False): gLogger.warn(f"Client connected from banned ip {clientIP}") clientTransport.close() continue - # Handle throttling - if self.__services[svcName].wantsThrottle and throttleExpires is None: - throttleExpires = time.time() + THROTTLE_SERVICE_SLEEP_SECONDS - if throttleExpires: - if time.time() > throttleExpires: - throttleExpires = None - else: - gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress())) - clientTransport.close() - continue + # Handle throttling: reject all connections while overloaded + # to prevent queue growth when threads are stuck + if self.__services[svcName].wantsThrottle: + gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress())) + clientTransport.close() + time.sleep(THROTTLE_SERVICE_SLEEP_SECONDS) + continue # Handle connection self.__stats.connectionStablished() self.__services[svcName].handleConnection(clientTransport) From 0bcc9e165e8394703af0d4bb7ecd5d24eedff5c7 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil Date: Thu, 26 Feb 2026 13:21:16 +0100 Subject: [PATCH 2/5] feat(DISET): add throttle duration tracking and diagnostic logging Track when throttling starts and log state transitions with queue/thread diagnostics. Periodic warnings are emitted every 30s while throttling persists, making it easier to diagnose stuck services from logs without needing to attach a debugger. Adds Service.throttleDiagnostics() to expose queue size, max queue, active threads, and max threads for logging purposes. --- src/DIRAC/Core/DISET/ServiceReactor.py | 27 ++++++++++++++++++++++++- src/DIRAC/Core/DISET/private/Service.py | 9 +++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/DIRAC/Core/DISET/ServiceReactor.py b/src/DIRAC/Core/DISET/ServiceReactor.py index f2aa01a7d26..d51b3b83e40 100644 --- a/src/DIRAC/Core/DISET/ServiceReactor.py +++ b/src/DIRAC/Core/DISET/ServiceReactor.py @@ -35,6 +35,9 @@ #: This sleep is repeated for as long as Service.wantsThrottle is truthy THROTTLE_SERVICE_SLEEP_SECONDS = 0.25 +#: Interval between periodic throttle warning messages (seconds) +THROTTLE_LOG_INTERVAL_SECONDS = 30 + class ServiceReactor: __transportExtraKeywords = { @@ -200,6 +203,8 @@ def __acceptIncomingConnection(self, svcName=False): services at the same time """ sel = self.__getListeningSelector(svcName) + throttleStartedAt = None + lastThrottleLog = 0 while self.__alive: clientTransport = None try: @@ -226,10 +231,30 @@ def __acceptIncomingConnection(self, svcName=False): # Handle throttling: reject all connections while overloaded # to prevent queue growth when threads are stuck if self.__services[svcName].wantsThrottle: - gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress())) + now = time.time() + if throttleStartedAt is None: + throttleStartedAt = now + diag = self.__services[svcName].throttleDiagnostics() + gLogger.warn( + f"Service {svcName} entering throttle mode", + f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}", + ) + lastThrottleLog = now + elif now - lastThrottleLog >= THROTTLE_LOG_INTERVAL_SECONDS: + duration = now - throttleStartedAt + diag = self.__services[svcName].throttleDiagnostics() + gLogger.warn( + f"Service {svcName} still throttling after {duration:.0f}s", + f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}", + ) + lastThrottleLog = now clientTransport.close() time.sleep(THROTTLE_SERVICE_SLEEP_SECONDS) continue + if throttleStartedAt is not None: + duration = time.time() - throttleStartedAt + gLogger.info(f"Service {svcName} throttle cleared after {duration:.1f}s") + throttleStartedAt = None # Handle connection self.__stats.connectionStablished() self.__services[svcName].handleConnection(clientTransport) diff --git a/src/DIRAC/Core/DISET/private/Service.py b/src/DIRAC/Core/DISET/private/Service.py index 25b2058900c..e02c977ac73 100644 --- a/src/DIRAC/Core/DISET/private/Service.py +++ b/src/DIRAC/Core/DISET/private/Service.py @@ -286,6 +286,15 @@ def wantsThrottle(self): nQueued = self._threadPool._work_queue.qsize() return nQueued > self._cfg.getMaxWaitingPetitions() + def throttleDiagnostics(self): + """Return a dict of diagnostics useful for throttle logging""" + return { + "queue": self._threadPool._work_queue.qsize(), + "maxQueue": self._cfg.getMaxWaitingPetitions(), + "threads": len(self._threadPool._threads), + "maxThreads": self._cfg.getMaxThreads(), + } + # Threaded process function def _processInThread(self, clientTransport): """ From 6c44a2e90fab2ad99668a5729d4dc00bb7a0712c Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil Date: Thu, 26 Feb 2026 13:22:16 +0100 Subject: [PATCH 3/5] feat(DISET): auto-restart service after configurable max throttle duration When a service is stuck in throttle mode (all threads blocked, queue full), it cannot recover without external intervention. Add a configurable MaxThrottleDuration CS option (default: 0 = disabled) that triggers a process exit after the specified number of seconds of continuous throttling. The process supervisor (e.g. runsv) then restarts the service cleanly, clearing all stuck state. When enabled, a FATAL log message is emitted before exit with full queue/thread diagnostics for post-mortem analysis. --- src/DIRAC/Core/DISET/ServiceReactor.py | 21 ++++++++++++++++--- .../DISET/private/ServiceConfiguration.py | 12 +++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/DIRAC/Core/DISET/ServiceReactor.py b/src/DIRAC/Core/DISET/ServiceReactor.py index d51b3b83e40..6086e139c57 100644 --- a/src/DIRAC/Core/DISET/ServiceReactor.py +++ b/src/DIRAC/Core/DISET/ServiceReactor.py @@ -244,16 +244,31 @@ def __acceptIncomingConnection(self, svcName=False): duration = now - throttleStartedAt diag = self.__services[svcName].throttleDiagnostics() gLogger.warn( - f"Service {svcName} still throttling after {duration:.0f}s", - f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}", + f"Service {svcName} still throttling", + f"duration={duration:.0f}s, queue={diag['queue']}/{diag['maxQueue']}, " + f"threads={diag['threads']}/{diag['maxThreads']}", ) lastThrottleLog = now + # Check if throttle has exceeded the maximum allowed duration + maxThrottleDuration = self.__services[svcName].getConfig().getMaxThrottleDuration() + if maxThrottleDuration > 0 and (now - throttleStartedAt) > maxThrottleDuration: + diag = self.__services[svcName].throttleDiagnostics() + gLogger.fatal( + f"Service {svcName} stuck in throttle, initiating process restart", + f"duration={now - throttleStartedAt:.0f}s (limit: {maxThrottleDuration}s), " + f"queue={diag['queue']}/{diag['maxQueue']}, " + f"threads={diag['threads']}/{diag['maxThreads']}", + ) + clientTransport.close() + self.__alive = False + return + gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress())) clientTransport.close() time.sleep(THROTTLE_SERVICE_SLEEP_SECONDS) continue if throttleStartedAt is not None: duration = time.time() - throttleStartedAt - gLogger.info(f"Service {svcName} throttle cleared after {duration:.1f}s") + gLogger.info(f"Service {svcName} throttle cleared", f"duration={duration:.1f}s") throttleStartedAt = None # Handle connection self.__stats.connectionStablished() diff --git a/src/DIRAC/Core/DISET/private/ServiceConfiguration.py b/src/DIRAC/Core/DISET/private/ServiceConfiguration.py index 500cf7e1a12..4ea0e7e0c86 100755 --- a/src/DIRAC/Core/DISET/private/ServiceConfiguration.py +++ b/src/DIRAC/Core/DISET/private/ServiceConfiguration.py @@ -121,6 +121,18 @@ def getURL(self): self.setURL(serviceURL) return serviceURL + def getMaxThrottleDuration(self): + """Maximum seconds a service can remain in throttle mode before triggering a restart. + + Set to 0 to disable auto-restart (default). + When the throttle duration exceeds this value, the service process exits + to allow the process supervisor (e.g. runsv) to restart it cleanly. + """ + try: + return int(self.getOption("MaxThrottleDuration")) + except Exception: + return 0 + def getContextLifeTime(self): optionValue = self.getOption("ContextLifeTime") try: From 195b95ca7160871692b70b32813535d2d0de5075 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil Date: Thu, 26 Feb 2026 13:42:57 +0100 Subject: [PATCH 4/5] refactor(DISET): remove throttle sleep to speed up rejection and recovery The 0.25s sleep between throttle rejections was a leftover from the old "accept 1 every 0.25s" mechanism. With the new reject-all approach, sel.select(timeout=10) already rate-limits when no connections arrive. Removing the sleep lets the service reject incoming connections immediately (faster client failover) and re-check wantsThrottle sooner (faster recovery when queue drains). --- src/DIRAC/Core/DISET/ServiceReactor.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/DIRAC/Core/DISET/ServiceReactor.py b/src/DIRAC/Core/DISET/ServiceReactor.py index 6086e139c57..64fcb8f470e 100644 --- a/src/DIRAC/Core/DISET/ServiceReactor.py +++ b/src/DIRAC/Core/DISET/ServiceReactor.py @@ -31,10 +31,6 @@ from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.ConfigurationSystem.Client import PathFinder -#: Time during which the service does not accept new requests and handles those in the queue, if the backlog is too large -#: This sleep is repeated for as long as Service.wantsThrottle is truthy -THROTTLE_SERVICE_SLEEP_SECONDS = 0.25 - #: Interval between periodic throttle warning messages (seconds) THROTTLE_LOG_INTERVAL_SECONDS = 30 @@ -264,7 +260,6 @@ def __acceptIncomingConnection(self, svcName=False): return gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress())) clientTransport.close() - time.sleep(THROTTLE_SERVICE_SLEEP_SECONDS) continue if throttleStartedAt is not None: duration = time.time() - throttleStartedAt From 1a435ecd7a6c8ae7b854b79c0b985a64661c4b5f Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil Date: Thu, 26 Feb 2026 13:46:38 +0100 Subject: [PATCH 5/5] refactor(DISET): move throttle state tracking into Service Throttle state (start time, last log time) was local to __acceptIncomingConnection, which resets on every re-entry from serve() (e.g. after a 10s select timeout). This meant MaxThrottleDuration could never accumulate past 10s if connections arrived slowly. Move tracking into Service instance state alongside wantsThrottle, which now handles entry/exit logging and periodic diagnostics. Add throttleDuration property for the reactor to check against MaxThrottleDuration. The ServiceReactor throttle block is simplified to just rejection + max duration check. --- src/DIRAC/Core/DISET/ServiceReactor.py | 41 +++++---------------- src/DIRAC/Core/DISET/private/Service.py | 48 +++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/DIRAC/Core/DISET/ServiceReactor.py b/src/DIRAC/Core/DISET/ServiceReactor.py index 64fcb8f470e..ac1dd931478 100644 --- a/src/DIRAC/Core/DISET/ServiceReactor.py +++ b/src/DIRAC/Core/DISET/ServiceReactor.py @@ -31,9 +31,6 @@ from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.ConfigurationSystem.Client import PathFinder -#: Interval between periodic throttle warning messages (seconds) -THROTTLE_LOG_INTERVAL_SECONDS = 30 - class ServiceReactor: __transportExtraKeywords = { @@ -199,8 +196,6 @@ def __acceptIncomingConnection(self, svcName=False): services at the same time """ sel = self.__getListeningSelector(svcName) - throttleStartedAt = None - lastThrottleLog = 0 while self.__alive: clientTransport = None try: @@ -225,33 +220,17 @@ def __acceptIncomingConnection(self, svcName=False): clientTransport.close() continue # Handle throttling: reject all connections while overloaded - # to prevent queue growth when threads are stuck - if self.__services[svcName].wantsThrottle: - now = time.time() - if throttleStartedAt is None: - throttleStartedAt = now - diag = self.__services[svcName].throttleDiagnostics() - gLogger.warn( - f"Service {svcName} entering throttle mode", - f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}", - ) - lastThrottleLog = now - elif now - lastThrottleLog >= THROTTLE_LOG_INTERVAL_SECONDS: - duration = now - throttleStartedAt - diag = self.__services[svcName].throttleDiagnostics() - gLogger.warn( - f"Service {svcName} still throttling", - f"duration={duration:.0f}s, queue={diag['queue']}/{diag['maxQueue']}, " - f"threads={diag['threads']}/{diag['maxThreads']}", - ) - lastThrottleLog = now + # to prevent queue growth when threads are stuck. + # wantsThrottle also handles state tracking and diagnostic logging. + svc = self.__services[svcName] + if svc.wantsThrottle: # Check if throttle has exceeded the maximum allowed duration - maxThrottleDuration = self.__services[svcName].getConfig().getMaxThrottleDuration() - if maxThrottleDuration > 0 and (now - throttleStartedAt) > maxThrottleDuration: - diag = self.__services[svcName].throttleDiagnostics() + maxThrottleDuration = svc.getConfig().getMaxThrottleDuration() + if maxThrottleDuration > 0 and svc.throttleDuration > maxThrottleDuration: + diag = svc.throttleDiagnostics() gLogger.fatal( f"Service {svcName} stuck in throttle, initiating process restart", - f"duration={now - throttleStartedAt:.0f}s (limit: {maxThrottleDuration}s), " + f"duration={svc.throttleDuration:.0f}s (limit: {maxThrottleDuration}s), " f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}", ) @@ -261,10 +240,6 @@ def __acceptIncomingConnection(self, svcName=False): gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress())) clientTransport.close() continue - if throttleStartedAt is not None: - duration = time.time() - throttleStartedAt - gLogger.info(f"Service {svcName} throttle cleared", f"duration={duration:.1f}s") - throttleStartedAt = None # Handle connection self.__stats.connectionStablished() self.__services[svcName].handleConnection(clientTransport) diff --git a/src/DIRAC/Core/DISET/private/Service.py b/src/DIRAC/Core/DISET/private/Service.py index e02c977ac73..65f47622316 100644 --- a/src/DIRAC/Core/DISET/private/Service.py +++ b/src/DIRAC/Core/DISET/private/Service.py @@ -32,6 +32,10 @@ from DIRAC.FrameworkSystem.Client.SecurityLogClient import SecurityLogClient +#: Interval between periodic throttle warning messages (seconds) +THROTTLE_LOG_INTERVAL_SECONDS = 30 + + class Service: SVC_VALID_ACTIONS = {"RPC": "export", "FileTransfer": "transfer", "Message": "msg", "Connection": "Message"} SVC_SECLOG_CLIENT = SecurityLogClient() @@ -62,6 +66,8 @@ def __init__(self, serviceData): self._transportPool = getGlobalTransportPool() self.__cloneId = 0 self.__maxFD = 0 + self._throttleStartedAt = None + self._lastThrottleLog = 0 self.activityMonitoring = False # Check if monitoring is enabled if "Monitoring" in Operations().getMonitoringBackends(monitoringType="ServiceMonitoring"): @@ -282,9 +288,47 @@ def err_handler(result): @property def wantsThrottle(self): - """Boolean property for if the service wants requests to stop being accepted""" + """Boolean property for if the service wants requests to stop being accepted. + + Also maintains throttle duration tracking: records when throttling + started and logs state transitions and periodic diagnostics. + Returns True if the service should reject incoming connections. + """ nQueued = self._threadPool._work_queue.qsize() - return nQueued > self._cfg.getMaxWaitingPetitions() + shouldThrottle = nQueued > self._cfg.getMaxWaitingPetitions() + + now = time.time() + if shouldThrottle: + if self._throttleStartedAt is None: + self._throttleStartedAt = now + self._lastThrottleLog = now + diag = self.throttleDiagnostics() + gLogger.warn( + f"Service {self._name} entering throttle mode", + f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}", + ) + elif now - self._lastThrottleLog >= THROTTLE_LOG_INTERVAL_SECONDS: + duration = now - self._throttleStartedAt + diag = self.throttleDiagnostics() + gLogger.warn( + f"Service {self._name} still throttling", + f"duration={duration:.0f}s, queue={diag['queue']}/{diag['maxQueue']}, " + f"threads={diag['threads']}/{diag['maxThreads']}", + ) + self._lastThrottleLog = now + elif self._throttleStartedAt is not None: + duration = now - self._throttleStartedAt + gLogger.info(f"Service {self._name} throttle cleared", f"duration={duration:.1f}s") + self._throttleStartedAt = None + + return shouldThrottle + + @property + def throttleDuration(self): + """Seconds the service has been continuously throttling, or 0 if not throttling.""" + if self._throttleStartedAt is None: + return 0 + return time.time() - self._throttleStartedAt def throttleDiagnostics(self): """Return a dict of diagnostics useful for throttle logging"""