From c23c53e59e878490f3a08115d333d2445060ed36 Mon Sep 17 00:00:00 2001 From: aldbr Date: Tue, 10 Mar 2026 10:58:32 +0100 Subject: [PATCH] fix: getting cpu work left from a single source of truth --- .../Agent/JobAgent.py | 41 ++---- .../Agent/test/Test_Agent_JobAgent.py | 20 ++- .../Client/CPUNormalization.py | 101 +++++++------ .../Client/test/Test_CPUNormalization.py | 137 ++++++++++++++++++ .../JobWrapper/Watchdog.py | 87 ++++++----- .../JobWrapper/test/Test_Watchdog.py | 73 +++++++++- 6 files changed, 320 insertions(+), 139 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/Client/test/Test_CPUNormalization.py diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index 15caddd74a8..ced175dbe99 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -27,7 +27,6 @@ from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator -from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory from DIRAC.WorkloadManagementSystem.Client import JobStatus, PilotStatus from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient @@ -81,10 +80,9 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None): self.defaultWrapperLocation = "DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py" # Timeleft - self.initTimes = os.times() self.initTimeLeft = 0.0 self.timeLeft = self.initTimeLeft - self.timeLeftUtil = None + self.initTime = time.time() self.pilotInfoReportedFlag = False # Attributes related to the processed jobs, it should take the following form: @@ -109,16 +107,11 @@ def initialize(self): if not result["OK"]: return result - result = self._getCEDict(self.computingElement) - if not result["OK"]: - return result - ceDict = result["Value"][0] - - self.initTimeLeft = ceDict.get("CPUTime", self.initTimeLeft) - self.initTimeLeft = gConfig.getValue("/Resources/Computing/CEDefaults/MaxCPUTime", self.initTimeLeft) + # Read initial CPU work left from config (seeded by pilot via dirac-wms-get-queue-cpu-time) + self.initTimeLeft = gConfig.getValue("/LocalSite/CPUTimeLeft", self.initTimeLeft) self.timeLeft = self.initTimeLeft - self.initTimes = os.times() + self.initTime = time.time() # Localsite options self.siteName = siteName() self.pilotReference = gConfig.getValue("/LocalSite/PilotReference", self.pilotReference) @@ -136,9 +129,6 @@ def initialize(self): self.logLevel = self.am_getOption("DefaultLogLevel", self.logLevel) self.defaultWrapperLocation = self.am_getOption("JobWrapperTemplate", self.defaultWrapperLocation) - # Utilities - self.timeLeftUtil = TimeLeft() - # Some innerCEs may want to make use of CGroup2 support, so we prepare it globally here res = CG2Manager().setUp() if res["OK"]: @@ -403,24 +393,19 @@ def _checkCEAvailability(self, computingElement): return S_OK() ############################################################################# - def _computeCPUWorkLeft(self, processors=1): + def _computeCPUWorkLeft(self): """ - Compute CPU Work Left in hepspec06 seconds + Compute CPU Work Left in hepspec06 seconds. + + Uses a simple wall-clock countdown from the initial value (seeded by the pilot + via dirac-wms-get-queue-cpu-time). The elapsed wall-clock time is multiplied by + the CPU normalization factor to get the consumed CPU work. - :param int processors: number of processors available :return: cpu work left (cpu time left * cpu power of the cpus) """ - # Sum all times but the last one (elapsed_time) and remove times at init (is this correct?) - cpuTimeConsumed = sum(os.times()[:-1]) - sum(self.initTimes[:-1]) - result = self.timeLeftUtil.getTimeLeft(cpuTimeConsumed, processors) - if not result["OK"]: - self.log.warn("There were errors calculating time left using the Timeleft utility", result["Message"]) - self.log.warn("The time left will be calculated using os.times() and the info in our possession") - self.log.info(f"Current raw CPU time consumed is {cpuTimeConsumed}") - if self.cpuFactor: - return self.initTimeLeft - cpuTimeConsumed * self.cpuFactor - return self.timeLeft - return result["Value"] + elapsed = time.time() - self.initTime + cpuWorkConsumed = elapsed * self.cpuFactor + return self.initTimeLeft - cpuWorkConsumed def _checkCPUWorkLeft(self, cpuWorkLeft): """Check that fillingMode is enabled and time left is sufficient to continue the execution""" diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py index 7954fde3287..c504bacd2f0 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py @@ -11,7 +11,6 @@ from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error from DIRAC import S_ERROR, S_OK, gLogger -from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory from DIRAC.Resources.Computing.test.Test_PoolComputingElement import badJobScript, jobScript from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent @@ -150,28 +149,27 @@ def test__checkCEAvailability(mocker, ceType, mockCEReply, expectedResult): @pytest.mark.parametrize( - "initTimeLeft, timeLeft, cpuFactor, mockTimeLeftReply, expectedTimeLeft", + "initTimeLeft, cpuFactor, elapsedSeconds, expectedTimeLeft", [ - (100000, 75000, None, {"OK": False, "Message": "Error"}, 75000), - (100000, 75000, 10, {"OK": False, "Message": "Error"}, 100000), - (100000, 75000, 10, {"OK": True, "Value": 25000}, 25000), + # No CPU factor: no work consumed, time left equals initial + (100000, 0, 100, 100000), + # With CPU factor: elapsed * cpuFactor is subtracted from initTimeLeft + (100000, 10, 100, 99000), + # Longer elapsed time + (100000, 10, 5000, 50000), ], ) -def test__computeCPUWorkLeft(mocker, initTimeLeft, timeLeft, cpuFactor, mockTimeLeftReply, expectedTimeLeft): +def test__computeCPUWorkLeft(mocker, initTimeLeft, cpuFactor, elapsedSeconds, expectedTimeLeft): """Test JobAgent()._computeCPUWorkLeft()""" mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") - mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.TimeLeft.getTimeLeft", return_value=mockTimeLeftReply - ) jobAgent = JobAgent("Test", "Test1") jobAgent.log = gLogger jobAgent.log.setLevel("DEBUG") - jobAgent.timeLeftUtil = TimeLeft() jobAgent.initTimeLeft = initTimeLeft - jobAgent.timeLeft = timeLeft jobAgent.cpuFactor = cpuFactor + jobAgent.initTime = time.time() - elapsedSeconds result = jobAgent._computeCPUWorkLeft() assert abs(result - expectedTimeLeft) < 10 diff --git a/src/DIRAC/WorkloadManagementSystem/Client/CPUNormalization.py b/src/DIRAC/WorkloadManagementSystem/Client/CPUNormalization.py index ad61696ed49..8c25084a04d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/CPUNormalization.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/CPUNormalization.py @@ -14,15 +14,11 @@ def getCPUTime(cpuNormalizationFactor): - """Trying to get CPUTime left for execution (in seconds). + """Compute the initial CPUTime left for execution (in seconds). - It will first look to get the work left looking for batch system information useing the TimeLeft utility. - If it succeeds, it will convert it in real second, and return it. - - If it fails, it tries to get it from the static info found in CS. - If it fails, it returns the default, which is a large 9999999, that we may consider as "Infinite". - - This is a generic method, independent from the middleware of the resource if TimeLeft doesn't return a value + This is called at pilot bootstrap (via dirac-wms-get-queue-cpu-time) to seed + the initial CPUTimeLeft value. It queries the batch system first, then falls + back to static CS configuration. args: cpuNormalizationFactor (float): the CPU power of the current Worker Node. @@ -31,55 +27,58 @@ def getCPUTime(cpuNormalizationFactor): returns: cpuTimeLeft (int): the CPU time left, in seconds """ - cpuTimeLeft = 0.0 - cpuWorkLeft = gConfig.getValue("/LocalSite/CPUTimeLeft", 0) - - if not cpuWorkLeft: - # Try and get the information from the CPU left utility - result = TimeLeft().getTimeLeft() - if result["OK"]: - cpuWorkLeft = result["Value"] - - if cpuWorkLeft > 0: - # This is in HS06sseconds - # We need to convert in real seconds - if not cpuNormalizationFactor: # if cpuNormalizationFactor passed in is 0, try get it from the local cfg + + # 1. Try to compute time left from the batch system (sacct, qstat, etc.) + result = TimeLeft().getTimeLeft() + if result["OK"]: + cpuWorkLeft = result["Value"] + # Batch system answered — trust it, even if 0 + if not cpuNormalizationFactor: cpuNormalizationFactor = gConfig.getValue("/LocalSite/CPUNormalizationFactor", 0.0) if cpuNormalizationFactor: - cpuTimeLeft = cpuWorkLeft / cpuNormalizationFactor + return int(cpuWorkLeft / cpuNormalizationFactor) + return 0 - if not cpuTimeLeft: - # now we know that we have to find the CPUTimeLeft by looking in the CS - # this is not granted to be correct as the CS units may not be real seconds - gridCE = gConfig.getValue("/LocalSite/GridCE") - ceQueue = gConfig.getValue("/LocalSite/CEQueue") - if not ceQueue: - # we have to look for a ceQueue in the CS - # A bit hacky. We should better profit from something generic - gLogger.warn("No CEQueue in local configuration, looking to find one in CS") - siteName = DIRAC.siteName() - queueSection = f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues" - res = gConfig.getSections(queueSection) - if not res["OK"]: - raise RuntimeError(res["Message"]) - queues = res["Value"] - cpuTimes = [gConfig.getValue(queueSection + "/" + queue + "/maxCPUTime", 9999999.0) for queue in queues] - # These are (real, wall clock) minutes - damn BDII! + cpuTimeLeft = 0.0 + + # 2. Fall back to queue configuration in the CS. + # These values are wall-clock minutes from BDII, so we convert to seconds. + gridCE = gConfig.getValue("/LocalSite/GridCE") + ceQueue = gConfig.getValue("/LocalSite/CEQueue") + if not ceQueue: + # we have to look for a ceQueue in the CS + # A bit hacky. We should better profit from something generic + gLogger.warn("No CEQueue in local configuration, looking to find one in CS") + siteName = DIRAC.siteName() + queueSection = f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues" + res = gConfig.getSections(queueSection) + if not res["OK"]: + raise RuntimeError(res["Message"]) + queues = res["Value"] + cpuTimes = [gConfig.getValue(queueSection + "/" + queue + "/maxCPUTime", 0.0) for queue in queues] + cpuTimes = [t for t in cpuTimes if t > 0] + if cpuTimes: cpuTimeLeft = min(cpuTimes) * 60 + else: + queueInfo = getQueueInfo(f"{gridCE}/{ceQueue}") + if not queueInfo["OK"] or not queueInfo["Value"]: + gLogger.warn("Can't find a CE/queue in CS") else: - queueInfo = getQueueInfo(f"{gridCE}/{ceQueue}") - cpuTimeLeft = 9999999.0 - if not queueInfo["OK"] or not queueInfo["Value"]: - gLogger.warn("Can't find a CE/queue, defaulting CPUTime to %d" % cpuTimeLeft) + queueCSSection = queueInfo["Value"]["QueueCSSection"] + cpuTimeInMinutes = gConfig.getValue(f"{queueCSSection}/maxCPUTime", 0.0) + if cpuTimeInMinutes: + cpuTimeLeft = cpuTimeInMinutes * 60.0 + gLogger.info(f"CPUTime for {queueCSSection}: {cpuTimeLeft:f}") else: - queueCSSection = queueInfo["Value"]["QueueCSSection"] - # These are (real, wall clock) minutes - damn BDII! - cpuTimeInMinutes = gConfig.getValue(f"{queueCSSection}/maxCPUTime", 0.0) - if cpuTimeInMinutes: - cpuTimeLeft = cpuTimeInMinutes * 60.0 - gLogger.info(f"CPUTime for {queueCSSection}: {cpuTimeLeft:f}") - else: - gLogger.warn(f"Can't find maxCPUTime for {queueCSSection}, defaulting CPUTime to {cpuTimeLeft:f}") + gLogger.warn(f"Can't find maxCPUTime for {queueCSSection}") + + if not cpuTimeLeft: + # 3. Last resort: global default from CS, or 0 (fail safe: match no more jobs) + cpuTimeLeft = gConfig.getValue("/Resources/Computing/CEDefaults/MaxCPUTime", 0) + if cpuTimeLeft: + gLogger.warn(f"Using fallback MaxCPUTime: {cpuTimeLeft}") + else: + gLogger.warn("Could not determine CPUTime left") return int(cpuTimeLeft) diff --git a/src/DIRAC/WorkloadManagementSystem/Client/test/Test_CPUNormalization.py b/src/DIRAC/WorkloadManagementSystem/Client/test/Test_CPUNormalization.py new file mode 100644 index 00000000000..fd472fae549 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Client/test/Test_CPUNormalization.py @@ -0,0 +1,137 @@ +"""Unit tests for CPUNormalization.getCPUTime()""" +from unittest.mock import patch + +from DIRAC import S_OK, S_ERROR + + +@patch("DIRAC.WorkloadManagementSystem.Client.CPUNormalization.TimeLeft") +@patch("DIRAC.WorkloadManagementSystem.Client.CPUNormalization.gConfig") +class TestGetCPUTime: + """Tests for getCPUTime() fallback chain.""" + + def _import_getCPUTime(self): + from DIRAC.WorkloadManagementSystem.Client.CPUNormalization import getCPUTime + + return getCPUTime + + def test_from_batch_system(self, mock_gConfig, mock_TimeLeft): + """Primary path: batch system returns CPU work left.""" + mock_gConfig.getValue.return_value = 0 + mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(30000) # HS06*s + + result = self._import_getCPUTime()(cpuNormalizationFactor=10.0) + + # 30000 / 10.0 = 3000 seconds + assert result == 3000 + mock_TimeLeft.return_value.getTimeLeft.assert_called_once() + + def test_batch_system_returns_zero(self, mock_gConfig, mock_TimeLeft): + """When batch system reports 0 time left, trust it and return 0.""" + mock_gConfig.getValue.return_value = 0 + mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(0) + + result = self._import_getCPUTime()(cpuNormalizationFactor=10.0) + + assert result == 0 + # Should NOT fall through to CS fallbacks + mock_gConfig.getValue.assert_not_called() + + def test_from_queue_cs(self, mock_gConfig, mock_TimeLeft): + """Fallback: batch system fails, uses queue maxCPUTime from CS.""" + mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info") + + config_values = { + "/LocalSite/GridCE": "ce.example.com", + "/LocalSite/CEQueue": "default", + "/LocalSite/Site": "LCG.Example.com", + } + + def mock_getValue(key, default=0): + if key in config_values: + return config_values[key] + # maxCPUTime in minutes + if "maxCPUTime" in key: + return 120.0 # 120 minutes + return default + + mock_gConfig.getValue.side_effect = mock_getValue + + with patch( + "DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo", + return_value=S_OK( + {"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"} + ), + ): + result = self._import_getCPUTime()(cpuNormalizationFactor=10.0) + + # 120 minutes * 60 = 7200 seconds + assert result == 7200 + + def test_fallback_max_cpu_time(self, mock_gConfig, mock_TimeLeft): + """Last resort: everything fails, uses /Resources/Computing/CEDefaults/MaxCPUTime.""" + mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info") + + config_values = { + "/LocalSite/GridCE": "ce.example.com", + "/LocalSite/CEQueue": "default", + "/LocalSite/Site": "LCG.Example.com", + "/Resources/Computing/CEDefaults/MaxCPUTime": 86400, + } + + def mock_getValue(key, default=0): + if key in config_values: + return config_values[key] + return default + + mock_gConfig.getValue.side_effect = mock_getValue + + with patch( + "DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo", + return_value=S_OK( + {"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"} + ), + ): + result = self._import_getCPUTime()(cpuNormalizationFactor=10.0) + + # maxCPUTime from queue returned 0, so falls through to CEDefaults/MaxCPUTime + assert result == 86400 + + def test_nothing_available_returns_zero(self, mock_gConfig, mock_TimeLeft): + """Fail safe: no batch info, no CS config, returns 0.""" + mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info") + + config_values = { + "/LocalSite/GridCE": "ce.example.com", + "/LocalSite/CEQueue": "default", + "/LocalSite/Site": "LCG.Example.com", + } + + def mock_getValue(key, default=0): + if key in config_values: + return config_values[key] + return default + + mock_gConfig.getValue.side_effect = mock_getValue + + with patch( + "DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo", + return_value=S_OK( + {"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"} + ), + ): + result = self._import_getCPUTime()(cpuNormalizationFactor=10.0) + + assert result == 0 + + def test_cpu_normalization_factor_from_config(self, mock_gConfig, mock_TimeLeft): + """When cpuNormalizationFactor=0, it should be read from local config.""" + mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(50000) # HS06*s + + mock_gConfig.getValue.side_effect = lambda key, default=0: { + "/LocalSite/CPUNormalizationFactor": 5.0, + }.get(key, default) + + result = self._import_getCPUTime()(cpuNormalizationFactor=0) + + # 50000 / 5.0 = 10000 seconds + assert result == 10000 diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py index 089de1967f2..c5803e08e52 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py @@ -7,9 +7,6 @@ Information is returned to the WMS via the heart-beat mechanism. This also interprets control signals from the WMS e.g. to kill a running job. - -- Still to implement: - - CPU normalization for correct comparison with job limit """ import datetime @@ -27,7 +24,6 @@ from DIRAC.ConfigurationSystem.Client.Config import gConfig from DIRAC.Core.Utilities.Os import getDiskSpace from DIRAC.Core.Utilities.Profiler import Profiler -from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient @@ -85,10 +81,9 @@ def __init__(self, pid, exeThread, spObject, jobCPUTime, memoryLimit=0, processo self.wallClockCheckCount = 0 self.nullCPUCount = 0 - self.grossTimeLeftLimit = 10 * self.checkingTime - self.timeLeftUtil = TimeLeft() self.timeLeft = 0 - self.littleTimeLeft = False + self.initialWallClockLeft = 0 + self.stopMargin = 300 # seconds of wall-clock time to reserve for post-processing self.cpuPower = 1.0 self.processors = processors @@ -132,11 +127,15 @@ def initialize(self): ) self.checkingTime = self.minCheckingTime - # The time left is returned in seconds @ 250 SI00 = 1 HS06, - # the self.checkingTime and self.pollingTime are in seconds, - # thus they need to be multiplied by a large enough factor - self.fineTimeLeftLimit = gConfig.getValue(self.section + "/TimeLeftLimit", 150 * self.pollingTime) self.cpuPower = gConfig.getValue("/LocalSite/CPUNormalizationFactor", 1.0) + self.stopMargin = gConfig.getValue(self.section + "/StopMargin", self.stopMargin) + + # Read CPU work left from config (written by JobAgent) and convert to wall-clock seconds + cpuWorkLeft = gConfig.getValue("/LocalSite/CPUTimeLeft", 0) + if cpuWorkLeft and self.cpuPower: + self.initialWallClockLeft = cpuWorkLeft / self.cpuPower + else: + self.initialWallClockLeft = 0 return S_OK() @@ -185,16 +184,15 @@ def execute(self): ): self.wallClockCheckCount += 1 - if self.littleTimeLeft: - # if we have gone over enough iterations query again - if self.littleTimeLeftCount == 0 and self.__timeLeft() == -1: - self.checkError = JobMinorStatus.JOB_EXCEEDED_CPU - self.log.error(self.checkError, self.timeLeft) + # Check time left on every poll cycle (cheap: just reads wall-clock) + if self.testTimeLeft: + result = self.__checkTimeLeft() + if not result["OK"]: + self.checkError = result["Message"] + self.log.error(self.checkError, f"wallClockLeft={result.get('Value', 'N/A')}s") self.spObject.killChild() return S_OK() - self.littleTimeLeftCount -= 1 - # Note: need to poll regularly to see if the thread is alive # but only perform checks with a certain frequency if (time.time() - self.initialValues["StartTime"]) > self.checkingTime * self.checkCount: @@ -449,9 +447,10 @@ def _checkProgress(self): report += "CPULimit: NA, " if self.testTimeLeft: - self.__timeLeft() - if self.timeLeft: + if self.initialWallClockLeft: report += "TimeLeft: OK" + else: + report += "TimeLeft: N/A (not available)" else: report += "TimeLeft: NA" @@ -721,34 +720,30 @@ def calibrate(self): self.__reportParameters(self.initialValues, "InitialValues") return S_OK() - def __timeLeft(self): - """ - return Normalized CPU time left in the batch system - 0 if not available - update self.timeLeft and self.littleTimeLeft + def __checkTimeLeft(self): + """Check if there is enough wall-clock time left in the batch slot. + + The initial wall-clock time left is read from the local configuration at initialization + (written by the JobAgent). A simple countdown is then used to determine the remaining time. + + Returns S_ERROR when the remaining wall-clock time drops below the configurable StopMargin + (default: 300s), leaving enough time for post-processing (output upload, cleanup, etc.). """ - # Get CPU time left in the batch system - result = self.timeLeftUtil.getTimeLeft(0.0) - if not result["OK"]: - # Could not get CPU time left, we might need to wait for the first loop - # or the Utility is not working properly for this batch system - # or we are in a batch system - timeLeft = 0 - else: - timeLeft = result["Value"] - - self.timeLeft = timeLeft - if not self.littleTimeLeft: - if timeLeft and timeLeft < self.grossTimeLeftLimit: - self.log.info("Checking with higher frequency as TimeLeft below grossTimeLeftLimit", f"{timeLeft=}") - self.littleTimeLeft = True - # TODO: better configurable way of doing this to be coded - self.littleTimeLeftCount = 15 - else: - if self.timeLeft and self.timeLeft < self.fineTimeLeftLimit: - timeLeft = -1 + if not self.initialWallClockLeft: + return S_OK("TimeLeft not available") + + elapsed = time.time() - self.initialValues["StartTime"] + wallClockLeft = self.initialWallClockLeft - elapsed + + # Update self.timeLeft in HS06*s for heartbeat display + self.timeLeft = wallClockLeft * self.cpuPower + + if wallClockLeft < self.stopMargin: + result = S_ERROR(JobMinorStatus.JOB_EXCEEDED_CPU) + result["Value"] = wallClockLeft + return result - return timeLeft + return S_OK(wallClockLeft) ############################################################################# def __getUsageSummary(self): diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_Watchdog.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_Watchdog.py index 8dc07b2c3d2..d7c998c5f7d 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_Watchdog.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_Watchdog.py @@ -1,7 +1,7 @@ -""" unit test for Watchdog.py -""" +"""unit test for Watchdog.py""" import os -from unittest.mock import MagicMock +import time +from unittest.mock import MagicMock, patch # sut from DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog import Watchdog @@ -37,3 +37,70 @@ def test__performChecksFull(): assert res["OK"] is True res = wd._performChecks() assert res["OK"] is True + + +class TestCheckTimeLeft: + """Tests for the simplified wall-clock countdown time-left logic.""" + + def _make_watchdog(self, initialWallClockLeft=0, stopMargin=300, cpuPower=10.0): + pid = os.getpid() + wd = Watchdog(pid, mock_exeThread, mock_spObject, 5000) + wd.initialWallClockLeft = initialWallClockLeft + wd.stopMargin = stopMargin + wd.cpuPower = cpuPower + wd.initialValues = {"StartTime": time.time()} + wd.testTimeLeft = 1 + return wd + + def test_time_left_not_available(self): + """When CPUTimeLeft was not set, the check should pass gracefully.""" + wd = self._make_watchdog(initialWallClockLeft=0) + result = wd._Watchdog__checkTimeLeft() + assert result["OK"] is True + + def test_plenty_of_time_left(self): + """When there's plenty of time, the check should pass.""" + wd = self._make_watchdog(initialWallClockLeft=3600, stopMargin=300) + result = wd._Watchdog__checkTimeLeft() + assert result["OK"] is True + assert result["Value"] > 3000 + + def test_below_stop_margin(self): + """When wall-clock left drops below stop margin, the check should fail.""" + wd = self._make_watchdog(initialWallClockLeft=3600, stopMargin=300) + # Pretend the job started 3500 seconds ago (only 100s left, below 300s margin) + wd.initialValues["StartTime"] = time.time() - 3500 + result = wd._Watchdog__checkTimeLeft() + assert not result["OK"] + + def test_time_left_updates_heartbeat_value(self): + """self.timeLeft should be updated in HS06*s for heartbeat display.""" + wd = self._make_watchdog(initialWallClockLeft=3600, cpuPower=10.0) + wd._Watchdog__checkTimeLeft() + # timeLeft should be approximately 3600 * 10 = 36000 HS06*s + assert wd.timeLeft > 35000 + + def test_exact_stop_margin_boundary(self): + """When wall-clock left equals stop margin, the check should fail (< not <=).""" + wd = self._make_watchdog(initialWallClockLeft=1000, stopMargin=300) + # 700s elapsed → 300s left, which is not < 300 → should pass + wd.initialValues["StartTime"] = time.time() - 700 + result = wd._Watchdog__checkTimeLeft() + assert result["OK"] is True + + @patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.gConfig") + def test_initialize_reads_config(self, mock_gConfig): + """initialize() should read CPUTimeLeft and convert to wall-clock seconds.""" + config_values = { + "/LocalSite/CPUTimeLeft": 36000, # 36000 HS06*s + "/LocalSite/CPUNormalizationFactor": 10.0, # 10 HS06 + } + mock_gConfig.getValue.side_effect = lambda key, default=None: config_values.get(key, default) + + pid = os.getpid() + wd = Watchdog(pid, mock_exeThread, mock_spObject, 5000) + wd.calibrate() + wd.initialize() + + # 36000 / 10.0 = 3600 wall-clock seconds + assert wd.initialWallClockLeft == 3600.0