Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ DIRAC_ROOT_PATH
If set, overwrites the value of DIRAC.rootPath.
Useful for using a non-standard location for `etc/dirac.cfg`, `runit/`, `startup/`, etc.

DIRAC_FAST_PROCESS_POOL
If ``true`` or ``yes``, enables faster pacing in RequestExecutingAgent
and ProcessPool by removing legacy enqueue/result-processing sleeps (default, ``no``).

DIRACSYSCONFIG
If set, its value should be (the full locations on the file system of) one of more DIRAC cfg file(s) (comma separated), whose content will be used for the DIRAC configuration
(see :ref:`dirac-cs-structure`)
Expand Down
41 changes: 25 additions & 16 deletions src/DIRAC/Core/Utilities/ProcessPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def S_ERROR(mess):
sLog = gLogger.getSubLogger(__name__)


FAST_PROCESS_POOL = os.environ.get("DIRAC_FAST_PROCESS_POOL", "no").lower() in ["true", "yes"]


class WorkingProcess(multiprocessing.Process):
"""
.. class:: WorkingProcess
Expand Down Expand Up @@ -650,6 +653,14 @@ def __resultsQueueApproxSize(self):
except NotImplementedError:
return 0

@property
def __pendingQueueApproxSize(self):
# qsize fails if sem_getvalue is not available for the current platform
try:
return self.__pendingQueue.qsize()
except NotImplementedError:
return 0

def stopProcessing(self, timeout=10):
"""
Case fire
Expand Down Expand Up @@ -814,8 +825,9 @@ def queueTask(self, task, blocking=True, usePoolCallbacks=False):
self.__prListLock.release()

self.__spawnNeededWorkingProcesses()
# throttle a bit to allow task state propagation
time.sleep(0.1)
if not FAST_PROCESS_POOL:
# Preserve previous pacing unless explicitly disabled.
time.sleep(0.1)
return S_OK()

def createAndQueueTask(
Expand Down Expand Up @@ -856,7 +868,7 @@ def hasPendingTasks(self):
:warning: results may be misleading if elements put into the queue are big

"""
return not self.__pendingQueue.empty()
return self.__pendingQueueApproxSize > 0

def isFull(self):
"""
Expand All @@ -875,7 +887,7 @@ def isWorking(self):

:param self: self reference
"""
return not self.__pendingQueue.empty() or self.getNumWorkingProcesses()
return self.__pendingQueueApproxSize > 0 or self.getNumWorkingProcesses()

def processResults(self):
"""
Expand All @@ -897,11 +909,15 @@ def processResults(self):
start = time.time()
self.__cleanDeadProcesses()
log.debug("__cleanDeadProcesses", f"t={time.time() - start:.2f}")
if not self.__pendingQueue.empty():
if self.__pendingQueueApproxSize:
self.__spawnNeededWorkingProcesses()
log.debug("__spawnNeededWorkingProcesses", f"t={time.time() - start:.2f}")
time.sleep(0.1)
if self.__resultsQueue.empty():
if not FAST_PROCESS_POOL:
# Preserve previous pacing unless explicitly disabled.
time.sleep(0.1)
try:
task = self.__resultsQueue.get(block=False)
except queue.Empty:
if self.__resultsQueueApproxSize:
log.warn("Results queue is empty but has non zero size", "%d" % self.__resultsQueueApproxSize)
# We only commit suicide if we reach a backlog greater than the maximum number of workers
Expand All @@ -912,13 +928,6 @@ def processResults(self):
if processed == 0:
log.debug("Process results, but queue is empty...")
break
# In principle, there should be a task right away. However,
# queue.empty can't be trusted (https://docs.python.org/3/library/queue.html#queue.Queue.empty)
try:
task = self.__resultsQueue.get(timeout=10)
except queue.Empty:
log.warn("Queue.empty lied to us again...")
return 0

log.debug("__resultsQueue.get", f"t={time.time() - start:.2f}")
# execute callbacks
Expand Down Expand Up @@ -949,7 +958,7 @@ def processAllResults(self, timeout=10):
:param self: self reference
"""
start = time.time()
while self.getNumWorkingProcesses() or not self.__pendingQueue.empty():
while self.getNumWorkingProcesses() or self.__pendingQueueApproxSize:
self.processResults()
time.sleep(1)
if time.time() - start > timeout:
Expand Down Expand Up @@ -1041,7 +1050,7 @@ def __backgroundProcess(self):
if self.__draining:
return
self.processResults()
time.sleep(1)
time.sleep(0.1 if FAST_PROCESS_POOL else 1)

def __del__(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.Core.Utilities import Network, TimeUtilities
from DIRAC.Core.Utilities.DErrno import cmpError
from DIRAC.Core.Utilities.ProcessPool import ProcessPool
from DIRAC.Core.Utilities.ProcessPool import ProcessPool, FAST_PROCESS_POOL
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.private.RequestTask import RequestTask
Expand Down Expand Up @@ -394,8 +394,9 @@ def execute(self):

# # update request counter
taskCounter += 1
# # task created, a little time kick to proceed
time.sleep(0.1)
if not FAST_PROCESS_POOL:
# Preserve previous pacing unless explicitly disabled.
time.sleep(0.1)
break

self.log.info("Flushing callbacks", f"({len(self.__requestCache)} requests still in cache)")
Expand Down
Loading