diff --git a/docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst b/docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst index 2576f7e5216..9f35dc57bcb 100644 --- a/docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst +++ b/docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst @@ -64,6 +64,10 @@ DIRAC_ROOT_PATH If set, overwrites the value of DIRAC.rootPath. Useful for using a non-standard location for `etc/dirac.cfg`, `runit/`, `startup/`, etc. +DIRAC_FAST_PROCESS_POOL + If ``true`` or ``yes``, enables faster pacing in RequestExecutingAgent + and ProcessPool by removing legacy enqueue/result-processing sleeps (default, ``no``). + DIRACSYSCONFIG If set, its value should be (the full locations on the file system of) one of more DIRAC cfg file(s) (comma separated), whose content will be used for the DIRAC configuration (see :ref:`dirac-cs-structure`) diff --git a/src/DIRAC/Core/Utilities/ProcessPool.py b/src/DIRAC/Core/Utilities/ProcessPool.py index ad86fa65788..23959d26fe6 100644 --- a/src/DIRAC/Core/Utilities/ProcessPool.py +++ b/src/DIRAC/Core/Utilities/ProcessPool.py @@ -131,6 +131,9 @@ def S_ERROR(mess): sLog = gLogger.getSubLogger(__name__) +FAST_PROCESS_POOL = os.environ.get("DIRAC_FAST_PROCESS_POOL", "no").lower() in ["true", "yes"] + + class WorkingProcess(multiprocessing.Process): """ .. class:: WorkingProcess @@ -650,6 +653,14 @@ def __resultsQueueApproxSize(self): except NotImplementedError: return 0 + @property + def __pendingQueueApproxSize(self): + # qsize fails if sem_getvalue is not available for the current platform + try: + return self.__pendingQueue.qsize() + except NotImplementedError: + return 0 + def stopProcessing(self, timeout=10): """ Case fire @@ -814,8 +825,9 @@ def queueTask(self, task, blocking=True, usePoolCallbacks=False): self.__prListLock.release() self.__spawnNeededWorkingProcesses() - # throttle a bit to allow task state propagation - time.sleep(0.1) + if not FAST_PROCESS_POOL: + # Preserve previous pacing unless explicitly disabled. + time.sleep(0.1) return S_OK() def createAndQueueTask( @@ -856,7 +868,7 @@ def hasPendingTasks(self): :warning: results may be misleading if elements put into the queue are big """ - return not self.__pendingQueue.empty() + return self.__pendingQueueApproxSize > 0 def isFull(self): """ @@ -875,7 +887,7 @@ def isWorking(self): :param self: self reference """ - return not self.__pendingQueue.empty() or self.getNumWorkingProcesses() + return self.__pendingQueueApproxSize > 0 or self.getNumWorkingProcesses() def processResults(self): """ @@ -897,11 +909,15 @@ def processResults(self): start = time.time() self.__cleanDeadProcesses() log.debug("__cleanDeadProcesses", f"t={time.time() - start:.2f}") - if not self.__pendingQueue.empty(): + if self.__pendingQueueApproxSize: self.__spawnNeededWorkingProcesses() log.debug("__spawnNeededWorkingProcesses", f"t={time.time() - start:.2f}") - time.sleep(0.1) - if self.__resultsQueue.empty(): + if not FAST_PROCESS_POOL: + # Preserve previous pacing unless explicitly disabled. + time.sleep(0.1) + try: + task = self.__resultsQueue.get(block=False) + except queue.Empty: if self.__resultsQueueApproxSize: log.warn("Results queue is empty but has non zero size", "%d" % self.__resultsQueueApproxSize) # We only commit suicide if we reach a backlog greater than the maximum number of workers @@ -912,13 +928,6 @@ def processResults(self): if processed == 0: log.debug("Process results, but queue is empty...") break - # In principle, there should be a task right away. However, - # queue.empty can't be trusted (https://docs.python.org/3/library/queue.html#queue.Queue.empty) - try: - task = self.__resultsQueue.get(timeout=10) - except queue.Empty: - log.warn("Queue.empty lied to us again...") - return 0 log.debug("__resultsQueue.get", f"t={time.time() - start:.2f}") # execute callbacks @@ -949,7 +958,7 @@ def processAllResults(self, timeout=10): :param self: self reference """ start = time.time() - while self.getNumWorkingProcesses() or not self.__pendingQueue.empty(): + while self.getNumWorkingProcesses() or self.__pendingQueueApproxSize: self.processResults() time.sleep(1) if time.time() - start > timeout: @@ -1041,7 +1050,7 @@ def __backgroundProcess(self): if self.__draining: return self.processResults() - time.sleep(1) + time.sleep(0.1 if FAST_PROCESS_POOL else 1) def __del__(self): """ diff --git a/src/DIRAC/RequestManagementSystem/Agent/RequestExecutingAgent.py b/src/DIRAC/RequestManagementSystem/Agent/RequestExecutingAgent.py index cef3b669470..87618a6739b 100644 --- a/src/DIRAC/RequestManagementSystem/Agent/RequestExecutingAgent.py +++ b/src/DIRAC/RequestManagementSystem/Agent/RequestExecutingAgent.py @@ -39,7 +39,7 @@ from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler from DIRAC.Core.Utilities import Network, TimeUtilities from DIRAC.Core.Utilities.DErrno import cmpError -from DIRAC.Core.Utilities.ProcessPool import ProcessPool +from DIRAC.Core.Utilities.ProcessPool import ProcessPool, FAST_PROCESS_POOL from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient from DIRAC.RequestManagementSystem.private.RequestTask import RequestTask @@ -394,8 +394,9 @@ def execute(self): # # update request counter taskCounter += 1 - # # task created, a little time kick to proceed - time.sleep(0.1) + if not FAST_PROCESS_POOL: + # Preserve previous pacing unless explicitly disabled. + time.sleep(0.1) break self.log.info("Flushing callbacks", f"({len(self.__requestCache)} requests still in cache)")