diff --git a/src/DIRAC/Core/DISET/ServiceReactor.py b/src/DIRAC/Core/DISET/ServiceReactor.py index c3f7862be1a..ac1dd931478 100644 --- a/src/DIRAC/Core/DISET/ServiceReactor.py +++ b/src/DIRAC/Core/DISET/ServiceReactor.py @@ -31,10 +31,6 @@ from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.ConfigurationSystem.Client import PathFinder -#: Time during which the service does not accept new requests and handles those in the queue, if the backlog is too large -#: This sleep is repeated for as long as Service.wantsThrottle is truthy -THROTTLE_SERVICE_SLEEP_SECONDS = 0.25 - class ServiceReactor: __transportExtraKeywords = { @@ -200,7 +196,6 @@ def __acceptIncomingConnection(self, svcName=False): services at the same time """ sel = self.__getListeningSelector(svcName) - throttleExpires = None while self.__alive: clientTransport = None try: @@ -224,16 +219,27 @@ def __acceptIncomingConnection(self, svcName=False): gLogger.warn(f"Client connected from banned ip {clientIP}") clientTransport.close() continue - # Handle throttling - if self.__services[svcName].wantsThrottle and throttleExpires is None: - throttleExpires = time.time() + THROTTLE_SERVICE_SLEEP_SECONDS - if throttleExpires: - if time.time() > throttleExpires: - throttleExpires = None - else: - gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress())) + # Handle throttling: reject all connections while overloaded + # to prevent queue growth when threads are stuck. + # wantsThrottle also handles state tracking and diagnostic logging. + svc = self.__services[svcName] + if svc.wantsThrottle: + # Check if throttle has exceeded the maximum allowed duration + maxThrottleDuration = svc.getConfig().getMaxThrottleDuration() + if maxThrottleDuration > 0 and svc.throttleDuration > maxThrottleDuration: + diag = svc.throttleDiagnostics() + gLogger.fatal( + f"Service {svcName} stuck in throttle, initiating process restart", + f"duration={svc.throttleDuration:.0f}s (limit: {maxThrottleDuration}s), " + f"queue={diag['queue']}/{diag['maxQueue']}, " + f"threads={diag['threads']}/{diag['maxThreads']}", + ) clientTransport.close() - continue + self.__alive = False + return + gLogger.warn("Rejecting client due to throttling", str(clientTransport.getRemoteAddress())) + clientTransport.close() + continue # Handle connection self.__stats.connectionStablished() self.__services[svcName].handleConnection(clientTransport) diff --git a/src/DIRAC/Core/DISET/private/Service.py b/src/DIRAC/Core/DISET/private/Service.py index 25b2058900c..65f47622316 100644 --- a/src/DIRAC/Core/DISET/private/Service.py +++ b/src/DIRAC/Core/DISET/private/Service.py @@ -32,6 +32,10 @@ from DIRAC.FrameworkSystem.Client.SecurityLogClient import SecurityLogClient +#: Interval between periodic throttle warning messages (seconds) +THROTTLE_LOG_INTERVAL_SECONDS = 30 + + class Service: SVC_VALID_ACTIONS = {"RPC": "export", "FileTransfer": "transfer", "Message": "msg", "Connection": "Message"} SVC_SECLOG_CLIENT = SecurityLogClient() @@ -62,6 +66,8 @@ def __init__(self, serviceData): self._transportPool = getGlobalTransportPool() self.__cloneId = 0 self.__maxFD = 0 + self._throttleStartedAt = None + self._lastThrottleLog = 0 self.activityMonitoring = False # Check if monitoring is enabled if "Monitoring" in Operations().getMonitoringBackends(monitoringType="ServiceMonitoring"): @@ -282,9 +288,56 @@ def err_handler(result): @property def wantsThrottle(self): - """Boolean property for if the service wants requests to stop being accepted""" + """Boolean property for if the service wants requests to stop being accepted. + + Also maintains throttle duration tracking: records when throttling + started and logs state transitions and periodic diagnostics. + Returns True if the service should reject incoming connections. + """ nQueued = self._threadPool._work_queue.qsize() - return nQueued > self._cfg.getMaxWaitingPetitions() + shouldThrottle = nQueued > self._cfg.getMaxWaitingPetitions() + + now = time.time() + if shouldThrottle: + if self._throttleStartedAt is None: + self._throttleStartedAt = now + self._lastThrottleLog = now + diag = self.throttleDiagnostics() + gLogger.warn( + f"Service {self._name} entering throttle mode", + f"queue={diag['queue']}/{diag['maxQueue']}, " f"threads={diag['threads']}/{diag['maxThreads']}", + ) + elif now - self._lastThrottleLog >= THROTTLE_LOG_INTERVAL_SECONDS: + duration = now - self._throttleStartedAt + diag = self.throttleDiagnostics() + gLogger.warn( + f"Service {self._name} still throttling", + f"duration={duration:.0f}s, queue={diag['queue']}/{diag['maxQueue']}, " + f"threads={diag['threads']}/{diag['maxThreads']}", + ) + self._lastThrottleLog = now + elif self._throttleStartedAt is not None: + duration = now - self._throttleStartedAt + gLogger.info(f"Service {self._name} throttle cleared", f"duration={duration:.1f}s") + self._throttleStartedAt = None + + return shouldThrottle + + @property + def throttleDuration(self): + """Seconds the service has been continuously throttling, or 0 if not throttling.""" + if self._throttleStartedAt is None: + return 0 + return time.time() - self._throttleStartedAt + + def throttleDiagnostics(self): + """Return a dict of diagnostics useful for throttle logging""" + return { + "queue": self._threadPool._work_queue.qsize(), + "maxQueue": self._cfg.getMaxWaitingPetitions(), + "threads": len(self._threadPool._threads), + "maxThreads": self._cfg.getMaxThreads(), + } # Threaded process function def _processInThread(self, clientTransport): diff --git a/src/DIRAC/Core/DISET/private/ServiceConfiguration.py b/src/DIRAC/Core/DISET/private/ServiceConfiguration.py index 500cf7e1a12..4ea0e7e0c86 100755 --- a/src/DIRAC/Core/DISET/private/ServiceConfiguration.py +++ b/src/DIRAC/Core/DISET/private/ServiceConfiguration.py @@ -121,6 +121,18 @@ def getURL(self): self.setURL(serviceURL) return serviceURL + def getMaxThrottleDuration(self): + """Maximum seconds a service can remain in throttle mode before triggering a restart. + + Set to 0 to disable auto-restart (default). + When the throttle duration exceeds this value, the service process exits + to allow the process supervisor (e.g. runsv) to restart it cleanly. + """ + try: + return int(self.getOption("MaxThrottleDuration")) + except Exception: + return 0 + def getContextLifeTime(self): optionValue = self.getOption("ContextLifeTime") try: