Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 50 additions & 25 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,41 +544,54 @@ def postProcess(
"""This method is called after the payload has finished running."""
self.log.info(f"Job Wrapper is starting the post processing phase for job {self.jobID}")

# if the execution thread didn't complete
if payloadStatus is None and not payloadExecutorError:
# process() and postProcess() may run on different machines: propagate the
# CPU figures to executionResults early so the accounting record sees them
# even if we hit one of the early-return failure paths below.
if cpuTimeConsumed and not self.executionResults.get("CPU"):
self.executionResults["CPU"] = cpuTimeConsumed

# No clean exit code: identify the cause (watchdog > executor > generic)
# Watchdog kills (killChild -> SIGTERM/SIGKILL) may leave payloadStatus=None,
# so checking watchdogError first preserves its specific reason.
if payloadStatus is None:
self.__logWatchdogStats(watchdogStats)
if payloadOutput and self.executionResults.get("CPU"):
self.__sendFinalStdOut(payloadOutput)

if watchdogError:
self.log.error("Payload killed by watchdog", watchdogError)
self.__report(status=JobStatus.FAILED, minorStatus=watchdogError, sendFlag=True)
self.__setJobParam("ApplicationError", watchdogError, sendFlag=True)
return S_ERROR(f"Payload killed by watchdog: {watchdogError}")
if payloadExecutorError:
self.log.error("Failed to execute the payload", payloadExecutorError)
self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_FAILED, sendFlag=True)
self.__setJobParam("ApplicationError", "None reported", sendFlag=True)
return S_ERROR(payloadExecutorError)
self.log.error("Application thread did not complete")
self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_NOT_COMPLETE, sendFlag=True)
self.__setJobParam("ApplicationError", JobMinorStatus.APP_THREAD_NOT_COMPLETE, sendFlag=True)
return S_ERROR("No outputs generated from job execution")

# If the execution thread got an error (not a payload error)
# payloadStatus is set: payload exited with a real exit code.

if payloadExecutorError:
self.log.error("Failed to execute the payload", payloadExecutorError)
self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_THREAD_FAILED, sendFlag=True)
applicationErrorStatus = "None reported"
if payloadStatus:
applicationErrorStatus = str(payloadStatus)
self.__setJobParam("ApplicationError", applicationErrorStatus, sendFlag=True)
self.__setJobParam("ApplicationError", str(payloadStatus), sendFlag=True)

# This might happen if process() and postProcess() are called on different machines
if cpuTimeConsumed and not self.executionResults.get("CPU"):
self.executionResults["CPU"] = cpuTimeConsumed

if watchdogError:
# Trust the payload's exit code over the watchdog: an elastic payload that catches
# the signal and exits cleanly should be reported per its exit code.
cleanExit = payloadStatus == 0 or payloadStatus in (DErrno.EWMSRESC, DErrno.EWMSRESC & 255)
if watchdogError and not cleanExit:
self.__report(status=JobStatus.FAILED, minorStatus=watchdogError, sendFlag=True)

if watchdogStats:
self.log.info(
"Statistics collected by the Watchdog:\n ",
"\n ".join(["%s: %s" % items for items in watchdogStats.items()]),
) # can be an iterator

if payloadStatus is None:
return S_ERROR("No outputs generated from job execution")
self.__logWatchdogStats(watchdogStats)

# Send final heartbeat of a configurable number of lines here
self.log.verbose("Sending final application standard output heartbeat")
self.__sendFinalStdOut(payloadOutput)
# Send final heartbeat of a configurable number of lines here.
if self.executionResults.get("CPU"):
self.log.verbose("Sending final application standard output heartbeat")
self.__sendFinalStdOut(payloadOutput)
self.log.verbose(f"Execution thread status = {payloadStatus}")

self.log.info("Checking directory contents after execution:")
Expand All @@ -591,18 +604,21 @@ def postProcess(
# no timeout and exit code is 0
self.log.info(res["Value"][1])

# Non-zero exit without a watchdog reason: report a generic application error.
if not watchdogError and payloadStatus != 0:
self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_ERRORS, sendFlag=True)

if not watchdogError and payloadStatus in (
# Reschedule and success branches honour the payload's exit code regardless of
# the watchdog: the payload survived the signal long enough to declare its outcome.
if payloadStatus in (
DErrno.EWMSRESC,
DErrno.EWMSRESC & 255,
): # the status will be truncated to 0xDE (222)
self.log.verbose("job will be rescheduled")
self.__report(minorStatus=JobMinorStatus.GOING_RESCHEDULE, sendFlag=True)
return S_ERROR(DErrno.EWMSRESC, "Job will be rescheduled")

if not watchdogError and payloadStatus == 0:
if payloadStatus == 0:
self.failedFlag = False
self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_SUCCESS, sendFlag=True)

Expand Down Expand Up @@ -702,6 +718,15 @@ def __getCPUHMS(self, cpuTime):
self.log.verbose(f"Human readable CPU time is: {humanTime}")
return S_OK((cpuTime, humanTime))

#############################################################################
def __logWatchdogStats(self, watchdogStats: dict):
"""Log the stats collected by the Watchdog, if any."""
if watchdogStats:
self.log.info(
"Statistics collected by the Watchdog:\n ",
"\n ".join(["%s: %s" % items for items in watchdogStats.items()]),
)

#############################################################################
def resolveInputData(self):
"""Input data is resolved here using a VO specific plugin module."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,22 @@ def executePayload(job: JobWrapper) -> bool:
)
job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION)
return False
gLogger.exception("Job failed in execution phase")
gLogger.error("Job failed in execution phase", repr(exc))
job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False)
job.jobReport.setJobStatus(
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False
)
job.sendFailoverRequest()
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
# If postProcess already stamped FAILED with a specific minor status (e.g. a
# watchdog reason like JOB_EXCEEDED_CPU), preserve it. Otherwise (postProcess
# bypassed) fall back to EXCEPTION_DURING_EXEC so the job isn't left RUNNING.
if job.wmsMajorStatus == JobStatus.FAILED:
job.sendFailoverRequest()
job.sendJobAccounting()
else:
job.jobReport.setJobStatus(
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False
)
job.sendFailoverRequest()
# setJobStatus on jobReport does not propagate to the JobWrapper's
# wmsMajorStatus / wmsMinorStatus, so pass them explicitly.
job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC)
return False
except Exception as exc: # pylint: disable=broad-except
gLogger.exception("Job raised exception during execution phase")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,161 @@ def test_postProcess_watchdog_error(setup_job_wrapper, mocker, mock_report_and_s
assert report_args[-1]["minorStatus"] == payloadResult["watchdogError"]


def test_postProcess_watchdog_signal_payload_reschedules(setup_job_wrapper, mocker, mock_report_and_set_param):
"""Watchdog signalled but the payload exited with EWMSRESC.

The reschedule branch must honour the payload's request even when the watchdog
fired -- second prong of the "trust the exit code" change. The job should end
up rescheduled, not stuck FAILED with the watchdog reason.
"""
jw = setup_job_wrapper()
report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param

mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect)
mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect)

payloadResult = {
"payloadStatus": DErrno.EWMSRESC,
"payloadOutput": "",
"payloadExecutorError": None,
"cpuTimeConsumed": [100, 200, 300, 400, 500],
"watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU,
"watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(MB)": "100"},
}
jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"]

result = jw.postProcess(**payloadResult)
assert not result["OK"]
assert result["Errno"] == DErrno.EWMSRESC
# Final report is the reschedule transition.
assert report_args[-1]["minorStatus"] == JobMinorStatus.GOING_RESCHEDULE


def test_postProcess_no_status_executor_and_watchdog_priority(setup_job_wrapper, mocker, mock_report_and_set_param):
"""payloadStatus=None with both executor error and watchdog error set.

Watchdog must win -- it's the proximate cause when both are present (the kill
is what made the executor record an error in the first place).
"""
jw = setup_job_wrapper()
report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param

mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect)
mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect)

payloadResult = {
"payloadStatus": None,
"payloadOutput": None,
"payloadExecutorError": "systemCall failed",
"cpuTimeConsumed": None,
"watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU,
"watchdogStats": None,
}

result = jw.postProcess(**payloadResult)
assert not result["OK"]
assert "Payload killed by watchdog" in result["Message"]
assert report_args[-1]["minorStatus"] == JobMinorStatus.JOB_EXCEEDED_CPU
# The executor-error branch's APP_THREAD_FAILED must NOT appear -- watchdog wins.
assert not any(call.get("minorStatus") == JobMinorStatus.APP_THREAD_FAILED for call in report_args)


def test_postProcess_watchdog_signal_payload_exits_clean(setup_job_wrapper, mocker, mock_report_and_set_param):
"""Watchdog signaled the payload but the payload caught it and exited 0.

Models the elastic-job pattern (e.g. Gauss): the watchdog sends a signal, the
payload finishes its current event and exits with code 0. We trust the exit code
and report success per the payload, not failure per the watchdog.
"""
jw = setup_job_wrapper()
report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param

mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect)
mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect)

payloadResult = {
"payloadStatus": 0,
"payloadOutput": "",
"payloadExecutorError": None,
"cpuTimeConsumed": [100, 200, 300, 400, 500],
"watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU,
"watchdogStats": {"LastUpdateCPU(s)": "100", "MemoryUsed(MB)": "100"},
}
jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"]

result = jw.postProcess(**payloadResult)
assert result["OK"]
# Final status must reflect the payload's clean exit, not the watchdog's signal.
assert report_args[-1]["status"] == JobStatus.COMPLETING
assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_SUCCESS
assert not jw.failedFlag
# No FAILED stamp, no watchdog reason should appear in the user-visible chain.
assert not any(call.get("status") == JobStatus.FAILED for call in report_args)
assert not any(call.get("minorStatus") == JobMinorStatus.JOB_EXCEEDED_CPU for call in report_args)


def test_postProcess_watchdog_killed_payload_with_partial_output(setup_job_wrapper, mocker, mock_report_and_set_param):
"""Watchdog killed the payload but partial output and CPU figures are available.

The wrapper should still send the partial output as the final heartbeat -- the
user often wants to see how far the payload got before the kill.
"""
jw = setup_job_wrapper()
report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param

mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect)
mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect)
sendFinal = mocker.patch.object(jw, "_JobWrapper__sendFinalStdOut")

payloadResult = {
"payloadStatus": None,
"payloadOutput": "Last log line before SIGTERM\n",
"payloadExecutorError": None,
"cpuTimeConsumed": [100, 200, 300, 400, 500],
"watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU,
"watchdogStats": {"LastUpdateCPU(s)": "100"},
}
jw.executionResults["CPU"] = payloadResult["cpuTimeConsumed"]

result = jw.postProcess(**payloadResult)

assert not result["OK"]
sendFinal.assert_called_once_with(payloadResult["payloadOutput"])
# Watchdog status still reported correctly.
assert report_args[-1]["minorStatus"] == JobMinorStatus.JOB_EXCEEDED_CPU


def test_postProcess_watchdog_killed_payload(setup_job_wrapper, mocker, mock_report_and_set_param):
"""Watchdog killed the payload mid-flight: payloadStatus=None + watchdogError set.

Regression test for the path where killChild leaves no clean exit code: the
watchdog's verdict must be reported as the minor status, not silently lost
behind APP_THREAD_NOT_COMPLETE / "No outputs generated".
"""
jw = setup_job_wrapper()
report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param

mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect)
mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect)

payloadResult = {
"payloadStatus": None,
"payloadOutput": None,
"payloadExecutorError": None,
"cpuTimeConsumed": None,
"watchdogError": JobMinorStatus.JOB_EXCEEDED_CPU,
"watchdogStats": None,
}

result = jw.postProcess(**payloadResult)
assert not result["OK"]
assert "Payload killed by watchdog" in result["Message"]
assert JobMinorStatus.JOB_EXCEEDED_CPU in result["Message"]
assert report_args[-1]["status"] == JobStatus.FAILED
assert report_args[-1]["minorStatus"] == JobMinorStatus.JOB_EXCEEDED_CPU
assert set_param_args[-1][0][1] == JobMinorStatus.JOB_EXCEEDED_CPU


def test_postProcess_executor_failed_no_status(setup_job_wrapper, mocker, mock_report_and_set_param):
"""Test the postProcess method of the JobWrapper class: executor failed and no status defined."""
jw = setup_job_wrapper()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Unit tests for JobWrapperUtilities.executePayload's failure-reporting paths.

These complement the integration tests in Test_JobWrapperTemplate.py.
"""
from unittest.mock import MagicMock

from DIRAC import S_ERROR
from DIRAC.Core.Utilities import DErrno
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import executePayload


def _make_mock_job(execute_result, wmsMajorStatus="Running"):
"""Build a minimal JobWrapper-like mock that returns the given execute_result."""
job = MagicMock()
job.jobID = 1
job.execute.return_value = execute_result
job.wmsMajorStatus = wmsMajorStatus
job.jobReport = MagicMock()
return job


def test_executePayload_preserves_minor_status_when_postProcess_set_failed():
"""When postProcess set wmsMajorStatus=FAILED, executePayload must not overwrite
the minor status with EXCEPTION_DURING_EXEC.

This is the path triggered by a watchdog kill: postProcess returns
S_ERROR("Payload killed by watchdog: ...") with the minor status already set
to (e.g.) JOB_EXCEEDED_CPU. The wrapper must respect that.
"""
job = _make_mock_job(
execute_result=S_ERROR(f"Payload killed by watchdog: {JobMinorStatus.JOB_EXCEEDED_CPU}"),
wmsMajorStatus=JobStatus.FAILED,
)

assert executePayload(job) is False

# setJobStatus with EXCEPTION_DURING_EXEC must not have been called.
for call in job.jobReport.setJobStatus.call_args_list:
assert call.kwargs.get("minorStatus") != JobMinorStatus.EXCEPTION_DURING_EXEC

# sendJobAccounting must be called with no minor-status override (preserves
# whatever wmsMinorStatus postProcess set on the job).
job.sendJobAccounting.assert_called_once_with()


def test_executePayload_falls_back_to_exception_when_postProcess_was_bypassed():
"""When postProcess never ran (e.g. process() returned S_ERROR before reaching
postProcess), wmsMajorStatus is still RUNNING and no minor-status was reported.
Fall back to EXCEPTION_DURING_EXEC so the user at least sees the job failed.
"""
job = _make_mock_job(
execute_result=S_ERROR("Payload process could not start after 5 seconds"),
wmsMajorStatus="Running",
)

assert executePayload(job) is False

# The fallback override must have been called.
failed_calls = [
call
for call in job.jobReport.setJobStatus.call_args_list
if call.kwargs.get("minorStatus") == JobMinorStatus.EXCEPTION_DURING_EXEC
]
assert len(failed_calls) == 1
assert failed_calls[0].kwargs["status"] == JobStatus.FAILED

# sendJobAccounting must be called WITH explicit FAILED kwargs in the bypass case:
# setJobStatus on jobReport doesn't update wmsMajorStatus / wmsMinorStatus on the
# JobWrapper, so without these kwargs the accounting record would still show the
# stale "Running" / "Application" state.
job.sendJobAccounting.assert_called_once_with(
status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC
)


def test_executePayload_reschedule_path_unchanged():
"""EWMSRESC sub-branch returns early and is not affected by the clobber fix."""
job = _make_mock_job(
execute_result=S_ERROR(DErrno.EWMSRESC, "Job will be rescheduled"),
wmsMajorStatus=JobStatus.FAILED,
)

assert executePayload(job) is False

# The reschedule branch calls sendJobAccounting with explicit args.
job.sendJobAccounting.assert_called_once()
call = job.sendJobAccounting.call_args
assert call.kwargs.get("minorStatus") == JobMinorStatus.JOB_WRAPPER_EXECUTION
Loading