diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index 23ef2c38a7f..d9a1a49c7c0 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -544,41 +544,54 @@ def postProcess( """This method is called after the payload has finished running.""" self.log.info(f"Job Wrapper is starting the post processing phase for job {self.jobID}") - # if the execution thread didn't complete - if payloadStatus is None and not payloadExecutorError: + # process() and postProcess() may run on different machines: propagate the + # CPU figures to executionResults early so the accounting record sees them + # even if we hit one of the early-return failure paths below. + if cpuTimeConsumed and not self.executionResults.get("CPU"): + self.executionResults["CPU"] = cpuTimeConsumed + + # No clean exit code: identify the cause (watchdog > executor > generic) + # Watchdog kills (killChild -> SIGTERM/SIGKILL) may leave payloadStatus=None, + # so checking watchdogError first preserves its specific reason. + if payloadStatus is None: + self.__logWatchdogStats(watchdogStats) + if payloadOutput and self.executionResults.get("CPU"): + self.__sendFinalStdOut(payloadOutput) + + if watchdogError: + self.log.error("Payload killed by watchdog", watchdogError) + self.__report(status=JobStatus.FAILED, minorStatus=watchdogError, sendFlag=True) + self.__setJobParam("ApplicationError", watchdogError, sendFlag=True) + return S_ERROR(f"Payload killed by watchdog: {watchdogError}") + if payloadExecutorError: + self.log.error("Failed to execute the payload", payloadExecutorError) + self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_FAILED, sendFlag=True) + self.__setJobParam("ApplicationError", "None reported", sendFlag=True) + return S_ERROR(payloadExecutorError) self.log.error("Application thread did not complete") self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_NOT_COMPLETE, sendFlag=True) self.__setJobParam("ApplicationError", JobMinorStatus.APP_THREAD_NOT_COMPLETE, sendFlag=True) return S_ERROR("No outputs generated from job execution") - # If the execution thread got an error (not a payload error) + # payloadStatus is set: payload exited with a real exit code. + if payloadExecutorError: self.log.error("Failed to execute the payload", payloadExecutorError) self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_FAILED, sendFlag=True) - applicationErrorStatus = "None reported" - if payloadStatus: - applicationErrorStatus = str(payloadStatus) - self.__setJobParam("ApplicationError", applicationErrorStatus, sendFlag=True) + self.__setJobParam("ApplicationError", str(payloadStatus), sendFlag=True) - # This might happen if process() and postProcess() are called on different machines - if cpuTimeConsumed and not self.executionResults.get("CPU"): - self.executionResults["CPU"] = cpuTimeConsumed - - if watchdogError: + # Trust the payload's exit code over the watchdog: an elastic payload that catches + # the signal and exits cleanly should be reported per its exit code. + cleanExit = payloadStatus == 0 or payloadStatus in (DErrno.EWMSRESC, DErrno.EWMSRESC & 255) + if watchdogError and not cleanExit: self.__report(status=JobStatus.FAILED, minorStatus=watchdogError, sendFlag=True) - if watchdogStats: - self.log.info( - "Statistics collected by the Watchdog:\n ", - "\n ".join(["%s: %s" % items for items in watchdogStats.items()]), - ) # can be an iterator - - if payloadStatus is None: - return S_ERROR("No outputs generated from job execution") + self.__logWatchdogStats(watchdogStats) - # Send final heartbeat of a configurable number of lines here - self.log.verbose("Sending final application standard output heartbeat") - self.__sendFinalStdOut(payloadOutput) + # Send final heartbeat of a configurable number of lines here. + if self.executionResults.get("CPU"): + self.log.verbose("Sending final application standard output heartbeat") + self.__sendFinalStdOut(payloadOutput) self.log.verbose(f"Execution thread status = {payloadStatus}") self.log.info("Checking directory contents after execution:") @@ -591,10 +604,13 @@ def postProcess( # no timeout and exit code is 0 self.log.info(res["Value"][1]) + # Non-zero exit without a watchdog reason: report a generic application error. if not watchdogError and payloadStatus != 0: self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_ERRORS, sendFlag=True) - if not watchdogError and payloadStatus in ( + # Reschedule and success branches honour the payload's exit code regardless of + # the watchdog: the payload survived the signal long enough to declare its outcome. + if payloadStatus in ( DErrno.EWMSRESC, DErrno.EWMSRESC & 255, ): # the status will be truncated to 0xDE (222) @@ -602,7 +618,7 @@ def postProcess( self.__report(minorStatus=JobMinorStatus.GOING_RESCHEDULE, sendFlag=True) return S_ERROR(DErrno.EWMSRESC, "Job will be rescheduled") - if not watchdogError and payloadStatus == 0: + if payloadStatus == 0: self.failedFlag = False self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_SUCCESS, sendFlag=True) @@ -702,6 +718,15 @@ def __getCPUHMS(self, cpuTime): self.log.verbose(f"Human readable CPU time is: {humanTime}") return S_OK((cpuTime, humanTime)) + ############################################################################# + def __logWatchdogStats(self, watchdogStats: dict): + """Log the stats collected by the Watchdog, if any.""" + if watchdogStats: + self.log.info( + "Statistics collected by the Watchdog:\n ", + "\n ".join(["%s: %s" % items for items in watchdogStats.items()]), + ) + ############################################################################# def resolveInputData(self): """Input data is resolved here using a VO specific plugin module.""" diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py index 947a7d0590e..019bdf4605e 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py @@ -217,13 +217,22 @@ def executePayload(job: JobWrapper) -> bool: ) job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) return False - gLogger.exception("Job failed in execution phase") + gLogger.error("Job failed in execution phase", repr(exc)) job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - job.jobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + # If postProcess already stamped FAILED with a specific minor status (e.g. a + # watchdog reason like JOB_EXCEEDED_CPU), preserve it. Otherwise (postProcess + # bypassed) fall back to EXCEPTION_DURING_EXEC so the job isn't left RUNNING. + if job.wmsMajorStatus == JobStatus.FAILED: + job.sendFailoverRequest() + job.sendJobAccounting() + else: + job.jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + # setJobStatus on jobReport does not propagate to the JobWrapper's + # wmsMajorStatus / wmsMinorStatus, so pass them explicitly. + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) return False except Exception as exc: # pylint: disable=broad-except gLogger.exception("Job raised exception during execution phase") diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py index b9893279eb7..7a6cc28d797 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py @@ -519,6 +519,161 @@ def test_postProcess_watchdog_error(setup_job_wrapper, mocker, mock_report_and_s assert report_args[-1]["minorStatus"] == payloadResult["watchdogError"] +def test_postProcess_watchdog_signal_payload_reschedules(setup_job_wrapper, mocker, mock_report_and_set_param): + """Watchdog signalled but the payload exited with EWMSRESC. + + The reschedule branch must honour the payload's request even when the watchdog + fired -- second prong of the "trust the exit code" change. The job should end + up rescheduled, not stuck FAILED with the watchdog reason. + """ + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": DErrno.EWMSRESC, + "payloadOutput": "", + "payloadExecutorError": None, + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU, + "watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(MB)": "100"}, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + assert not result["OK"] + assert result["Errno"] == DErrno.EWMSRESC + # Final report is the reschedule transition. + assert report_args[-1]["minorStatus"] == JobMinorStatus.GOING_RESCHEDULE + + +def test_postProcess_no_status_executor_and_watchdog_priority(setup_job_wrapper, mocker, mock_report_and_set_param): + """payloadStatus=None with both executor error and watchdog error set. + + Watchdog must win -- it's the proximate cause when both are present (the kill + is what made the executor record an error in the first place). + """ + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": None, + "payloadOutput": None, + "payloadExecutorError": "systemCall failed", + "cpuTimeConsumed": None, + "watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU, + "watchdogStats": None, + } + + result = jw.postProcess(**payloadResult) + assert not result["OK"] + assert "Payload killed by watchdog" in result["Message"] + assert report_args[-1]["minorStatus"] == JobMinorStatus.JOB_EXCEEDED_CPU + # The executor-error branch's APP_THREAD_FAILED must NOT appear -- watchdog wins. + assert not any(call.get("minorStatus") == JobMinorStatus.APP_THREAD_FAILED for call in report_args) + + +def test_postProcess_watchdog_signal_payload_exits_clean(setup_job_wrapper, mocker, mock_report_and_set_param): + """Watchdog signaled the payload but the payload caught it and exited 0. + + Models the elastic-job pattern (e.g. Gauss): the watchdog sends a signal, the + payload finishes its current event and exits with code 0. We trust the exit code + and report success per the payload, not failure per the watchdog. + """ + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": 0, + "payloadOutput": "", + "payloadExecutorError": None, + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU, + "watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(MB)": "100"}, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + assert result["OK"] + # Final status must reflect the payload's clean exit, not the watchdog's signal. + assert report_args[-1]["status"] == JobStatus.COMPLETING + assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_SUCCESS + assert not jw.failedFlag + # No FAILED stamp, no watchdog reason should appear in the user-visible chain. + assert not any(call.get("status") == JobStatus.FAILED for call in report_args) + assert not any(call.get("minorStatus") == JobMinorStatus.JOB_EXCEEDED_CPU for call in report_args) + + +def test_postProcess_watchdog_killed_payload_with_partial_output(setup_job_wrapper, mocker, mock_report_and_set_param): + """Watchdog killed the payload but partial output and CPU figures are available. + + The wrapper should still send the partial output as the final heartbeat -- the + user often wants to see how far the payload got before the kill. + """ + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + sendFinal = mocker.patch.object(jw, "_JobWrapper__sendFinalStdOut") + + payloadResult = { + "payloadStatus": None, + "payloadOutput": "Last log line before SIGTERM\n", + "payloadExecutorError": None, + "cpuTimeConsumed": [100, 200, 300, 400, 500], + "watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU, + "watchdogStats": {"LastUpdateCPU(s)": "100"}, + } + jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"] + + result = jw.postProcess(**payloadResult) + + assert not result["OK"] + sendFinal.assert_called_once_with(payloadResult["payloadOutput"]) + # Watchdog status still reported correctly. + assert report_args[-1]["minorStatus"] == JobMinorStatus.JOB_EXCEEDED_CPU + + +def test_postProcess_watchdog_killed_payload(setup_job_wrapper, mocker, mock_report_and_set_param): + """Watchdog killed the payload mid-flight: payloadStatus=None + watchdogError set. + + Regression test for the path where killChild leaves no clean exit code: the + watchdog's verdict must be reported as the minor status, not silently lost + behind APP_THREAD_NOT_COMPLETE / "No outputs generated". + """ + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param + + mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) + mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) + + payloadResult = { + "payloadStatus": None, + "payloadOutput": None, + "payloadExecutorError": None, + "cpuTimeConsumed": None, + "watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU, + "watchdogStats": None, + } + + result = jw.postProcess(**payloadResult) + assert not result["OK"] + assert "Payload killed by watchdog" in result["Message"] + assert JobMinorStatus.JOB_EXCEEDED_CPU in result["Message"] + assert report_args[-1]["status"] == JobStatus.FAILED + assert report_args[-1]["minorStatus"] == JobMinorStatus.JOB_EXCEEDED_CPU + assert set_param_args[-1][0][1] == JobMinorStatus.JOB_EXCEEDED_CPU + + def test_postProcess_executor_failed_no_status(setup_job_wrapper, mocker, mock_report_and_set_param): """Test the postProcess method of the JobWrapper class: executor failed and no status defined.""" jw = setup_job_wrapper() diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperUtilities.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperUtilities.py new file mode 100644 index 00000000000..6028f8f1ad0 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperUtilities.py @@ -0,0 +1,89 @@ +"""Unit tests for JobWrapperUtilities.executePayload's failure-reporting paths. + +These complement the integration tests in Test_JobWrapperTemplate.py. +""" +from unittest.mock import MagicMock + +from DIRAC import S_ERROR +from DIRAC.Core.Utilities import DErrno +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import executePayload + + +def _make_mock_job(execute_result, wmsMajorStatus="Running"): + """Build a minimal JobWrapper-like mock that returns the given execute_result.""" + job = MagicMock() + job.jobID = 1 + job.execute.return_value = execute_result + job.wmsMajorStatus = wmsMajorStatus + job.jobReport = MagicMock() + return job + + +def test_executePayload_preserves_minor_status_when_postProcess_set_failed(): + """When postProcess set wmsMajorStatus=FAILED, executePayload must not overwrite + the minor status with EXCEPTION_DURING_EXEC. + + This is the path triggered by a watchdog kill: postProcess returns + S_ERROR("Payload killed by watchdog: ...") with the minor status already set + to (e.g.) JOB_EXCEEDED_CPU. The wrapper must respect that. + """ + job = _make_mock_job( + execute_result=S_ERROR(f"Payload killed by watchdog: {JobMinorStatus.JOB_EXCEEDED_CPU}"), + wmsMajorStatus=JobStatus.FAILED, + ) + + assert executePayload(job) is False + + # setJobStatus with EXCEPTION_DURING_EXEC must not have been called. + for call in job.jobReport.setJobStatus.call_args_list: + assert call.kwargs.get("minorStatus") != JobMinorStatus.EXCEPTION_DURING_EXEC + + # sendJobAccounting must be called with no minor-status override (preserves + # whatever wmsMinorStatus postProcess set on the job). + job.sendJobAccounting.assert_called_once_with() + + +def test_executePayload_falls_back_to_exception_when_postProcess_was_bypassed(): + """When postProcess never ran (e.g. process() returned S_ERROR before reaching + postProcess), wmsMajorStatus is still RUNNING and no minor-status was reported. + Fall back to EXCEPTION_DURING_EXEC so the user at least sees the job failed. + """ + job = _make_mock_job( + execute_result=S_ERROR("Payload process could not start after 5 seconds"), + wmsMajorStatus="Running", + ) + + assert executePayload(job) is False + + # The fallback override must have been called. + failed_calls = [ + call + for call in job.jobReport.setJobStatus.call_args_list + if call.kwargs.get("minorStatus") == JobMinorStatus.EXCEPTION_DURING_EXEC + ] + assert len(failed_calls) == 1 + assert failed_calls[0].kwargs["status"] == JobStatus.FAILED + + # sendJobAccounting must be called WITH explicit FAILED kwargs in the bypass case: + # setJobStatus on jobReport doesn't update wmsMajorStatus / wmsMinorStatus on the + # JobWrapper, so without these kwargs the accounting record would still show the + # stale "Running" / "Application" state. + job.sendJobAccounting.assert_called_once_with( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC + ) + + +def test_executePayload_reschedule_path_unchanged(): + """EWMSRESC sub-branch returns early and is not affected by the clobber fix.""" + job = _make_mock_job( + execute_result=S_ERROR(DErrno.EWMSRESC, "Job will be rescheduled"), + wmsMajorStatus=JobStatus.FAILED, + ) + + assert executePayload(job) is False + + # The reschedule branch calls sendJobAccounting with explicit args. + job.sendJobAccounting.assert_called_once() + call = job.sendJobAccounting.call_args + assert call.kwargs.get("minorStatus") == JobMinorStatus.JOB_WRAPPER_EXECUTION