diff --git a/src/dvsim/flow/formal.py b/src/dvsim/flow/formal.py index 8f196e16..b7bfb70f 100644 --- a/src/dvsim/flow/formal.py +++ b/src/dvsim/flow/formal.py @@ -10,6 +10,7 @@ from dvsim.flow.one_shot import OneShotCfg from dvsim.job.data import CompletedJobStatus +from dvsim.job.status import JobStatus from dvsim.logging import log from dvsim.utils import subst_wildcards @@ -230,7 +231,7 @@ def _gen_results(self, results: Sequence[CompletedJobStatus]) -> None: assert len(self.deploy) == 1 mode = self.deploy[0] - if complete_job.status == "P": + if complete_job.status == JobStatus.PASSED: result_data = Path( subst_wildcards(self.build_dir, {"build_mode": mode.name}), "results.hjson", @@ -262,7 +263,7 @@ def _gen_results(self, results: Sequence[CompletedJobStatus]) -> None: else: summary += ["N/A", "N/A", "N/A"] - if complete_job.status != "P": + if complete_job.status != JobStatus.PASSED: results_str += "\n## List of Failures\n" + "".join(complete_job.fail_msg.message) messages = self.result.get("messages") diff --git a/src/dvsim/flow/one_shot.py b/src/dvsim/flow/one_shot.py index 486015c4..b8d86dd0 100644 --- a/src/dvsim/flow/one_shot.py +++ b/src/dvsim/flow/one_shot.py @@ -12,6 +12,7 @@ from dvsim.flow.base import FlowCfg from dvsim.job.data import CompletedJobStatus from dvsim.job.deploy import CompileOneShot +from dvsim.job.status import JobStatus from dvsim.logging import log from dvsim.modes import BuildMode, Mode from dvsim.utils import rm_path @@ -92,10 +93,10 @@ def _expand(self) -> None: # Set directories with links for ease of debug / triage. self.links = { - "D": self.scratch_path + "/" + "dispatched", - "P": self.scratch_path + "/" + "passed", - "F": self.scratch_path + "/" + "failed", - "K": self.scratch_path + "/" + "killed", + JobStatus.DISPATCHED: self.scratch_path + "/" + "dispatched", + JobStatus.PASSED: self.scratch_path + "/" + "passed", + JobStatus.FAILED: self.scratch_path + "/" + "failed", + JobStatus.KILLED: self.scratch_path + "/" + "killed", } # Use the default build mode for tests that do not specify it diff --git a/src/dvsim/job/data.py b/src/dvsim/job/data.py index cd3b42c5..5dda0622 100644 --- a/src/dvsim/job/data.py +++ b/src/dvsim/job/data.py @@ -14,6 +14,7 @@ from pydantic import BaseModel, ConfigDict +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher from dvsim.report.data import IPMeta, ToolMeta @@ -97,13 +98,13 @@ class JobSpec(BaseModel): """Output directory for the job results files.""" log_path: Path """Path for the job log file.""" - links: Mapping[str, Path] + links: Mapping[JobStatus, Path] """Path for links directories.""" # TODO: remove the need for these callables here pre_launch: Callable[[Launcher], None] """Callback function for pre-launch actions.""" - post_finish: Callable[[str], None] + post_finish: Callable[[JobStatus], None] """Callback function for tidy up actions once the job is finished.""" pass_patterns: Sequence[str] @@ -153,7 +154,7 @@ class CompletedJobStatus(BaseModel): simulated_time: float """Simulation time.""" - status: str - """Job status string [P,F,K,...]""" + status: JobStatus + """Status of the job.""" fail_msg: ErrorMessage """Error message.""" diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 9a55d8bf..f91cfbd7 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -14,6 +14,7 @@ from tabulate import tabulate from dvsim.job.data import JobSpec, WorkspaceConfig +from dvsim.job.status import JobStatus from dvsim.job.time import JobTime from dvsim.launcher.base import Launcher from dvsim.logging import log @@ -347,10 +348,10 @@ def callback(launcher: Launcher) -> None: return callback - def post_finish(self) -> Callable[[str], None]: + def post_finish(self) -> Callable[[JobStatus], None]: """Get post finish callback.""" - def callback(status: str) -> None: + def callback(status: JobStatus) -> None: """Perform additional post-finish activities (callback). This is invoked by launcher::_post_finish(). @@ -641,12 +642,12 @@ def callback(launcher: Launcher) -> None: return callback - def post_finish(self) -> Callable[[str], None]: + def post_finish(self) -> Callable[[JobStatus], None]: """Get post finish callback.""" - def callback(status: str) -> None: + def callback(status: JobStatus) -> None: """Perform tidy up tasks.""" - if status != "P": + if status != JobStatus.PASSED: # Delete the coverage data if available. rm_path(self.cov_db_test_dir) @@ -812,16 +813,16 @@ def _set_attrs(self) -> None: self.cov_results = "" self.cov_results_dict = {} - def post_finish(self) -> Callable[[str], None]: + def post_finish(self) -> Callable[[JobStatus], None]: """Get post finish callback.""" - def callback(status: str) -> None: + def callback(status: JobStatus) -> None: """Extract the coverage results summary for the dashboard. If the extraction fails, an appropriate exception is raised, which must be caught by the caller to mark the job as a failure. """ - if self.dry_run or status != "P": + if self.dry_run or status != JobStatus.PASSED: return plugin = get_sim_tool_plugin(tool=self.sim_cfg.tool) diff --git a/src/dvsim/job/status.py b/src/dvsim/job/status.py new file mode 100644 index 00000000..55dabe39 --- /dev/null +++ b/src/dvsim/job/status.py @@ -0,0 +1,29 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""An enum definition for the various job statuses.""" + +from enum import Enum + +__all__ = ("JobStatus",) + + +class JobStatus(Enum): + """Status of a Job.""" + + QUEUED = 0 + DISPATCHED = 1 + PASSED = 2 + FAILED = 3 + KILLED = 4 + + @property + def shorthand(self) -> str: + """Shorthand for the job status, e.g. 'D' for 'Dispatched'.""" + return self.name[0] + + @property + def ended(self) -> bool: + """Whether this status corresponds to some ended job.""" + return self in (JobStatus.PASSED, JobStatus.FAILED, JobStatus.KILLED) diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index f361591c..f1cddb03 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -15,6 +15,7 @@ from pydantic import BaseModel, ConfigDict +from dvsim.job.status import JobStatus from dvsim.job.time import JobTime from dvsim.logging import log from dvsim.tool.utils import get_sim_tool_plugin @@ -118,7 +119,7 @@ def __init__(self, job_spec: "JobSpec") -> None: # _check_status() method, but eventually updated by the _post_finish() # method, in case any of the cleanup tasks fails. This value is finally # returned to the Scheduler by the poll() method. - self.status = None + self.status = JobStatus.QUEUED # Return status of the process running the job. self.exit_code = None @@ -210,18 +211,18 @@ def _make_odir(self) -> None: Path(self.job_spec.odir).mkdir(exist_ok=True, parents=True) - def _link_odir(self, status: str) -> None: + def _link_odir(self, status: JobStatus) -> None: """Soft-links the job's directory based on job's status. - The dispatched, passed and failed directories in the scratch area + The DISPATCHED, PASSED and FAILED directories in the scratch area provide a quick way to get to the job that was executed. """ dest = Path(self.job_spec.links[status], self.job_spec.qual_name) mk_symlink(path=self.job_spec.odir, link=dest) # Delete the symlink from dispatched directory if it exists. - if status != "D": - old = Path(self.job_spec.links["D"], self.job_spec.qual_name) + if status != JobStatus.DISPATCHED: + old = Path(self.job_spec.links[JobStatus.DISPATCHED], self.job_spec.qual_name) rm_path(old) def _dump_env_vars(self, exports: Mapping[str, str]) -> None: @@ -258,13 +259,13 @@ def launch(self) -> None: self._do_launch() @abstractmethod - def poll(self) -> str | None: + def poll(self) -> JobStatus: """Poll the launched job for completion. Invokes _check_status() and _post_finish() when the job completes. Returns: - status of the job or None + status of the job """ @@ -272,14 +273,14 @@ def poll(self) -> str | None: def kill(self) -> None: """Terminate the job.""" - def _check_status(self) -> tuple[str, ErrorMessage | None]: - """Determine the outcome of the job (P/F if it ran to completion). + def _check_status(self) -> tuple[JobStatus, ErrorMessage | None]: + """Determine the outcome of the job (PASSED/FAILED if it ran to completion). Returns: (status, err_msg) extracted from the log, where the status is - "P" if the it passed, "F" otherwise. This is invoked by poll() just - after the job finishes. err_msg is an instance of the named tuple - ErrorMessage. + PASSED if the job passed, FAILED otherwise. This is invoked by + poll() just after the job finishes. err_msg is an instance of the + named tuple ErrorMessage. """ @@ -307,7 +308,7 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: return None if self.job_spec.dry_run: - return "P", None + return JobStatus.PASSED, None # Only one fail pattern needs to be seen. chk_failed = bool(self.job_spec.fail_patterns) @@ -324,7 +325,7 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: ) as f: lines = f.readlines() except OSError as e: - return "F", ErrorMessage( + return JobStatus.FAILED, ErrorMessage( line_number=None, message=f"Error opening file {self.job_spec.log_path}:\n{e}", context=[], @@ -368,7 +369,7 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: # If failed, then nothing else to do. Just return. # Provide some extra lines for context. end = cnt + 5 - return "F", ErrorMessage( + return JobStatus.FAILED, ErrorMessage( line_number=cnt + 1, message=line.strip(), context=lines[cnt:end], @@ -384,32 +385,32 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: # exit code for whatever reason, then show the last 10 lines of the log # as the failure message, which might help with the debug. if self.exit_code != 0: - return "F", ErrorMessage( + return JobStatus.FAILED, ErrorMessage( line_number=None, message="Job returned non-zero exit code", context=lines[-10:], ) if chk_passed: - return "F", ErrorMessage( + return JobStatus.FAILED, ErrorMessage( line_number=None, message=f"Some pass patterns missing: {pass_patterns}", context=lines[-10:], ) - return "P", None + return JobStatus.PASSED, None - def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: + def _post_finish(self, status: JobStatus, err_msg: ErrorMessage) -> None: """Do post-completion activities, such as preparing the results. Must be invoked by poll(), after the job outcome is determined. Args: - status: status of the job, either 'P', 'F' or 'K'. + status: status of the completed job (must be either PASSED, FAILED or KILLED). err_msg: an instance of the named tuple ErrorMessage. """ - assert status in ["P", "F", "K"] + assert status.ended self._link_odir(status) - log.debug("Item %s has completed execution: %s", self, status) + log.debug("Item %s has completed execution: %s", self, status.shorthand) try: # Run the target-specific cleanup tasks regardless of the job's @@ -419,8 +420,8 @@ def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: except Exception as e: # If the job had already failed, then don't do anything. If it's # cleanup task failed, then mark the job as failed. - if status == "P": - status = "F" + if status == JobStatus.PASSED: + status = JobStatus.FAILED err_msg = ErrorMessage( line_number=None, message=f"{e}", @@ -428,7 +429,7 @@ def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: ) self.status = status - if self.status != "P": + if self.status != JobStatus.PASSED: assert err_msg assert isinstance(err_msg, ErrorMessage) self.fail_msg = err_msg diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py index 066b31df..d64fcb86 100644 --- a/src/dvsim/launcher/fake.py +++ b/src/dvsim/launcher/fake.py @@ -7,6 +7,7 @@ from random import choice from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher if TYPE_CHECKING: @@ -16,12 +17,12 @@ __all__ = ("FakeLauncher",) -def _run_test_handler(job_spec: "JobSpec") -> str: +def _run_test_handler(job_spec: "JobSpec") -> JobStatus: """Handle a RunTest deploy job.""" - return choice(("P", "F")) + return choice((JobStatus.PASSED, JobStatus.FAILED)) -def _cov_report_handler(job_spec: "JobSpec") -> str: +def _cov_report_handler(job_spec: "JobSpec") -> JobStatus: """Handle a CompileSim deploy job.""" # TODO: this hack doesn't work any more and needs implementing by writing # a file that can be parsed as if it's been generated by the tool. @@ -38,7 +39,7 @@ def _cov_report_handler(job_spec: "JobSpec") -> str: # ] # job_spec.cov_results_dict = {k: f"{random() * 100:.2f} %" for k in keys} - return "P" + return JobStatus.PASSED _DEPLOY_HANDLER = { @@ -56,19 +57,19 @@ class FakeLauncher(Launcher): def _do_launch(self) -> None: """Do the launch.""" - def poll(self) -> str | None: + def poll(self) -> JobStatus: """Check status of the running process.""" deploy_cls = self.job_spec.job_type if deploy_cls in _DEPLOY_HANDLER: return _DEPLOY_HANDLER[deploy_cls](job_spec=self.job_spec) # Default result is Pass - return "P" + return JobStatus.PASSED def kill(self) -> None: """Kill the running process.""" self._post_finish( - "K", + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]), ) diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index d0f2c14c..8132844b 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -10,6 +10,7 @@ import subprocess from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError if TYPE_CHECKING: @@ -100,20 +101,24 @@ def _do_launch(self) -> None: # Wait until the process exit self._process.wait() - self._link_odir("D") + self._link_odir(JobStatus.DISPATCHED) - def poll(self) -> str | None: + def poll(self) -> JobStatus: """Check status of the running process. - This returns 'D', 'P', 'F', or 'K'. If 'D', the job is still running. - If 'P', the job finished successfully. If 'F', the job finished with - an error. If 'K' it was killed. + This returns a job status. If DISPATCHED, the job is still running. + If PASSED, the job finished successfully. If FAILED, the job finished + with an error. If KILLED, it was killed. This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned 'P' or 'F'. + must not be called again once it has returned PASSED or FAILED. """ if self._process is None: - return "E" + msg = ( + "poll() was called either before calling launch() or after " + "ignoring a LauncherError from launch()." + ) + raise LauncherError(msg) elapsed_time = datetime.datetime.now() - self.start_time self.job_runtime_secs = elapsed_time.total_seconds() @@ -127,16 +132,16 @@ def poll(self) -> str | None: timeout_mins = self.job_spec.timeout_mins timeout_message = f"Job timed out after {timeout_mins} minutes" self._post_finish( - "K", + JobStatus.KILLED, ErrorMessage( line_number=None, message=timeout_message, context=[timeout_message], ), ) - return "K" + return JobStatus.KILLED - return "D" + return JobStatus.DISPATCHED self.exit_code = self._process.returncode status, err_msg = self._check_status() @@ -168,7 +173,7 @@ def kill(self) -> None: """ self._kill() self._post_finish( - "K", + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]), ) diff --git a/src/dvsim/launcher/lsf.py b/src/dvsim/launcher/lsf.py index f177cbc6..ec75ddc0 100644 --- a/src/dvsim/launcher/lsf.py +++ b/src/dvsim/launcher/lsf.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import TYPE_CHECKING, ClassVar +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log from dvsim.utils import clean_odirs @@ -270,23 +271,23 @@ def _do_launch(self) -> None: for job in LsfLauncher.jobs[cfg][job_name]: job.bsub_out = Path(f"{job_script}.{job.index}.out") job.job_id = f"{job_id}[{job.index}]" - job._link_odir("D") + job._link_odir(JobStatus.DISPATCHED) - def poll(self) -> str | None: + def poll(self) -> JobStatus: """Poll the status of the job. Returns: - status of the job or None + status of the job """ # It is possible we may have determined the status already. - if self.status: + if self.status is not JobStatus.QUEUED: return self.status if not self.bsub_out_fd: # If job id is not set, the bsub command has not been sent yet. if not self.job_id: - return "D" + return JobStatus.DISPATCHED # We redirect the job's output to the log file, so the job script # output remains empty until the point it finishes. This is a very @@ -295,10 +296,10 @@ def poll(self) -> str | None: # created), then the job is still running. try: if not self.bsub_out.stat().st_size: - return "D" + return JobStatus.DISPATCHED except FileNotFoundError: - return "D" + return JobStatus.DISPATCHED # If we got to this point, we can now open the job script output # file for reading. @@ -307,14 +308,14 @@ def poll(self) -> str | None: except OSError as e: self._post_finish( - "F", + JobStatus.FAILED, ErrorMessage( line_number=None, message=f"ERROR: Failed to open {self.bsub_out}\n{e}.", context=[e], ), ) - return "F" + return JobStatus.FAILED # Now that the job has completed, we need to determine its status. # @@ -360,7 +361,7 @@ def poll(self) -> str | None: # Fail the test if we have reached the max polling retries. if self.num_poll_retries == LsfLauncher.max_poll_retries: self._post_finish( - "F", + JobStatus.FAILED, ErrorMessage( message="ERROR: Reached max retries while " f"reading job script output {self.bsub_out} to determine" @@ -368,9 +369,9 @@ def poll(self) -> str | None: context=[], ), ) - return "F" + return JobStatus.FAILED - return "D" + return JobStatus.DISPATCHED def _get_job_exit_code(self) -> int | None: """Read the job script output to retrieve the exit code. @@ -437,14 +438,14 @@ def kill(self) -> None: else: log.error("Job ID for %s not found", self.job_spec.full_name) - self._post_finish("K", ErrorMessage(message="Job killed!", context=[])) + self._post_finish(JobStatus.KILLED, ErrorMessage(message="Job killed!", context=[])) def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: """Tidy up after the job is complete.""" if self.bsub_out_fd: self.bsub_out_fd.close() if self.exit_code is None: - self.exit_code = 0 if status == "P" else 1 + self.exit_code = 0 if status == JobStatus.PASSED else 1 super()._post_finish(status, err_msg) @staticmethod @@ -453,7 +454,7 @@ def _post_finish_job_array( job_name: str, err_msg: str, ) -> None: - """On LSF error, mark all jobs in this array as killed. + """On LSF error, mark all jobs in this array as FAILED. Args: cfg: workspace configuration @@ -463,6 +464,6 @@ def _post_finish_job_array( """ for job in LsfLauncher.jobs[cfg.project][job_name]: job._post_finish( - "F", + JobStatus.FAILED, ErrorMessage(line_number=None, message=err_msg, context=[err_msg]), ) diff --git a/src/dvsim/launcher/nc.py b/src/dvsim/launcher/nc.py index c4a41496..9c326032 100644 --- a/src/dvsim/launcher/nc.py +++ b/src/dvsim/launcher/nc.py @@ -11,6 +11,7 @@ import sys from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log from dvsim.utils import rm_path @@ -162,20 +163,20 @@ def _do_launch(self) -> None: finally: self._close_process() - self._link_odir("D") + self._link_odir(JobStatus.DISPATCHED) def minutes_since_start(self): return (datetime.datetime.now() - self.start_time).total_seconds() / 60 - def poll(self): + def poll(self) -> JobStatus: """Check status of the running process. - This returns 'D', 'P', 'F', or 'K'. If 'D', the job is still running. - If 'P', the job finished successfully. If 'F', the job finished with - an error. If 'K' it was killed. + This returns a job status. If DISPATCHED, the job is still running. + If PASSED, the job finished successfully. If FAILED, the job finished + with an error. If KILLED, it was killed. This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned 'P' or 'F'. + must not be called again once it has returned PASSED or FAILED. """ assert self.process is not None if self.process.poll() is None: @@ -215,16 +216,15 @@ def poll(self): elif self.nc_job_state == "wait_timeout": timeout_message = f"Job timed out after waiting {wait_timeout_mins} mins" self._post_finish( - "K", + JobStatus.KILLED, ErrorMessage( line_number=None, message=timeout_message, context=[timeout_message], ), ) - return "K" - return "D" - return "D" + return JobStatus.KILLED + return JobStatus.DISPATCHED self.exit_code = self.process.returncode status, err_msg = self._check_status() @@ -255,7 +255,9 @@ def kill(self) -> None: """ self._kill() - self._post_finish("K", ErrorMessage(line_number=None, message="Job killed!", context=[])) + self._post_finish( + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]) + ) def _post_finish(self, status, err_msg) -> None: self._close_process() diff --git a/src/dvsim/launcher/sge/launcher.py b/src/dvsim/launcher/sge/launcher.py index 2eab0228..24411fd6 100644 --- a/src/dvsim/launcher/sge/launcher.py +++ b/src/dvsim/launcher/sge/launcher.py @@ -11,6 +11,7 @@ from subprocess import PIPE, Popen from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.launcher.sge.engine import * # noqa: F403 @@ -93,21 +94,22 @@ def _do_launch(self) -> None: finally: self._close_process() - self._link_odir("D") + self._link_odir(JobStatus.DISPATCHED) f.close() - def poll(self) -> str: + def poll(self) -> JobStatus: """Check status of the running process. - This returns 'D', 'P' or 'F'. If 'D', the job is still running. If 'P', - the job finished successfully. If 'F', the job finished with an error. + This returns a job status. If DISPATCHED, the job is still running. + If PASSED, the job finished successfully. If FAILED, the job finished + with an error. If KILLED, it was killed. This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned 'P' or 'F'. + must not be called again once it has returned PASSED or FAILED. """ assert self.process is not None if self.process.poll() is None: - return "D" + return JobStatus.DISPATCHED # ------------------------------------- # copy SGE job results to log file sge_log_path = Path(f"{self.job_spec.log_path}.sge") @@ -157,7 +159,9 @@ def kill(self) -> None: output = process.communicate()[0].decode("utf-8") output = output.rstrip("\n") # ---------------------------- - self._post_finish("K", ErrorMessage(line_number=None, message="Job killed!", context=[])) + self._post_finish( + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]) + ) def _post_finish(self, status, err_msg) -> None: super()._post_finish(status, err_msg) diff --git a/src/dvsim/launcher/slurm.py b/src/dvsim/launcher/slurm.py index deab4bc5..218d95c7 100644 --- a/src/dvsim/launcher/slurm.py +++ b/src/dvsim/launcher/slurm.py @@ -9,6 +9,7 @@ import subprocess from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log @@ -85,20 +86,21 @@ def _do_launch(self) -> None: finally: self._close_process() - self._link_odir("D") + self._link_odir(JobStatus.DISPATCHED) - def poll(self): + def poll(self) -> JobStatus: """Check status of the running process. - This returns 'D', 'P' or 'F'. If 'D', the job is still running. If 'P', - the job finished successfully. If 'F', the job finished with an error. + This returns a job status. If DISPATCHED, the job is still running. + If PASSED, the job finished successfully. If FAILED, the job finished + with an error. If KILLED, it was killed. This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned 'P' or 'F'. + must not be called again once it has returned PASSED or FAILED. """ assert self.process is not None if self.process.poll() is None: - return "D" + return JobStatus.DISPATCHED # Copy slurm job results to log file if pathlib.Path(self.slurm_log_file).exists(): @@ -138,7 +140,9 @@ def kill(self) -> None: self.process.wait(timeout=2) except subprocess.TimeoutExpired: self.process.kill() - self._post_finish("K", ErrorMessage(line_number=None, message="Job killed!", context=[])) + self._post_finish( + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]) + ) def _post_finish(self, status, err_msg) -> None: super()._post_finish(status, err_msg) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index c2519799..ee1f99ed 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -13,11 +13,13 @@ MutableSet, Sequence, ) +from itertools import dropwhile from signal import SIGINT, SIGTERM, signal from types import FrameType from typing import TYPE_CHECKING, Any from dvsim.job.data import CompletedJobStatus, JobSpec +from dvsim.job.status import JobStatus from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError from dvsim.logging import log from dvsim.utils.status_printer import get_status_printer @@ -28,7 +30,7 @@ def total_sub_items( - d: Mapping[str, Sequence[JobSpec]] | Mapping["FlowCfg", Sequence[JobSpec]], + d: Mapping[str, Sequence[str]] | Mapping["FlowCfg", Sequence[JobSpec]], ) -> int: """Return the total number of sub items in a mapping. @@ -151,10 +153,9 @@ def __init__( self._status_printer.init_target(target=target, msg=msg) # A map from the job names tracked by this class to their - # current status. This status is 'Q', 'D', 'P', 'F' or 'K', - # corresponding to membership in the dicts above. This is not - # per-target. - self.job_status: MutableMapping[str, str] = {} + # current status, corresponding to membership in the dicts + # above. This is not per-target. + self.job_status: MutableMapping[str, JobStatus] = {} # Create the launcher instance for all items. self._launchers: Mapping[str, Launcher] = { @@ -299,7 +300,7 @@ def _enqueue_successors(self, job_name: str | None = None) -> None: msg = f"Job {next_job_name} already scheduled" raise RuntimeError(msg) - self.job_status[next_job_name] = "Q" + self.job_status[next_job_name] = JobStatus.QUEUED self._queued[target].append(next_job_name) self._unschedule_item(next_job_name) @@ -318,13 +319,34 @@ def _cancel_successors(self, job_name: str) -> None: self._cancel_item(next_item, cancel_successors=False) items.extend(self._get_successors(next_item)) + def _get_successor_target(self, job_name: str) -> str | None: + """Find the first target in the scheduled list that follows the target of a given job. + + Args: + job_name: name of the job (to find the successor target of). + + Returns: + the successor, or None if no such target exists or there is no successor. + + """ + job: JobSpec = self._jobs[job_name] + + if job.target not in self._scheduled: + msg = f"Scheduler does not contain target {job.target}" + raise KeyError(msg) + + # Find the first target that follows the target in the scheduled list. + target_iter = dropwhile(lambda x: x != job.target, self._scheduled) + next(target_iter, None) + return next(target_iter, None) + def _get_successors(self, job_name: str | None = None) -> Sequence[str]: """Find immediate successors of an item. - We choose the target that follows the 'item''s current target and find - the list of successors whose dependency list contains 'item'. If 'item' - is None, we pick successors from all cfgs, else we pick successors only - from the cfg to which the item belongs. + We choose the target that follows the item's current target and find + the list of successors whose dependency list contains "job_name". If + "job_name" is None, we pick successors from all cfgs, else we pick + successors only from the cfg to which the item belongs. Args: job_name: name of the job @@ -334,39 +356,16 @@ def _get_successors(self, job_name: str | None = None) -> Sequence[str]: """ if job_name is None: - target = next(iter(self._scheduled)) - - if target is None: - return [] - - cfgs = set(self._scheduled[target]) - + target = next(iter(self._scheduled), None) + cfgs = set() if target is None else set(self._scheduled[target]) else: + target = self._get_successor_target(job_name) job: JobSpec = self._jobs[job_name] - - if job.target not in self._scheduled: - msg = f"Scheduler does not contain target {job.target}" - raise KeyError(msg) - - target_iterator = iter(self._scheduled) - target = next(target_iterator) - - found = False - while not found: - if target == job.target: - found = True - - try: - target = next(target_iterator) - - except StopIteration: - return [] - - if target is None: - return [] - cfgs = {job.block.name} + if target is None: + return () + # Find item's successors that can be enqueued. We assume here that # only the immediately succeeding target can be enqueued at this # time. @@ -375,9 +374,10 @@ def _get_successors(self, job_name: str | None = None) -> Sequence[str]: for next_item in self._scheduled[target][cfg]: if job_name is not None: job = self._jobs[next_item] - # Something is terribly wrong if item exists but the - # next_item's dependency list is empty. - assert job.dependencies + if not job.dependencies: + raise RuntimeError( + "Job item exists but the next item's dependency list is empty?" + ) if job_name not in job.dependencies: continue @@ -406,7 +406,7 @@ def _ok_to_enqueue(self, job_name: str) -> bool: return False # Has the dep completed? - if self.job_status[dep] not in ["P", "F", "K"]: + if not self.job_status[dep].ended: return False return True @@ -434,14 +434,14 @@ def _ok_to_run(self, job_name: str) -> bool: continue dep_status = self.job_status[dep_name] - if dep_status not in ["P", "F", "K"]: - raise ValueError("Status must be one of P, F, or K") + if not dep_status.ended: + msg = f"Expected dependent job {dep_name} to be ended, not {dep_status.name}." + raise ValueError(msg) if job.needs_all_dependencies_passing: - if dep_status in ["F", "K"]: + if dep_status != JobStatus.PASSED: return False - - elif dep_status in ["P"]: + elif dep_status == JobStatus.PASSED: return True return job.needs_all_dependencies_passing @@ -477,25 +477,25 @@ def _poll(self, hms: str) -> bool: self._running[target], self._last_item_polled_idx[target], ) - status = self._launchers[job_name].poll() + try: + status = self._launchers[job_name].poll() + except LauncherError as e: + log.error("Error when dispatching target: %s", str(e)) + status = JobStatus.KILLED level = log.VERBOSE - if status not in ["D", "P", "F", "E", "K"]: - msg = f"Status must be one of D, P, F, E or K but found {status}" - raise ValueError(msg) - - if status == "D": + if status == JobStatus.DISPATCHED: continue - if status == "P": + if status == JobStatus.PASSED: self._passed[target].add(job_name) - elif status == "F": + elif status == JobStatus.FAILED: self._failed[target].add(job_name) level = log.ERROR else: - # Killed or Error dispatching + # Killed, still Queued, or some error when dispatching. self._killed[target].add(job_name) level = log.ERROR @@ -509,7 +509,7 @@ def _poll(self, hms: str) -> bool: hms, target, job_name, - status, + status.shorthand, ) # Enqueue item's successors regardless of its status. @@ -524,8 +524,45 @@ def _poll(self, hms: str) -> bool: return changed + def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: + """Dispatch the named queued job. + + Args: + hms: time as a string formatted in hh:mm:ss + target: the target to dispatch this job to + job_name: the name of the job to dispatch + + """ + try: + self._launchers[job_name].launch() + + except LauncherError: + log.exception("Error launching %s", job_name) + self._kill_item(job_name) + + except LauncherBusyError: + log.exception("Launcher busy") + + self._queued[target].append(job_name) + + log.verbose( + "[%s]: [%s]: [reqeued]: %s", + hms, + target, + job_name, + ) + return + + self._running[target].append(job_name) + self.job_status[job_name] = JobStatus.DISPATCHED + def _dispatch(self, hms: str) -> None: - """Dispatch some queued items if possible.""" + """Dispatch some queued items if possible. + + Args: + hms: time as a string formatted in hh:mm:ss + + """ slots = self._launcher_cls.max_parallel - total_sub_items(self._running) if slots <= 0: return @@ -589,28 +626,7 @@ def _dispatch(self, hms: str) -> None: ) for job_name in to_dispatch: - try: - self._launchers[job_name].launch() - - except LauncherError: - log.exception("Error launching %s", job_name) - self._kill_item(job_name) - - except LauncherBusyError: - log.exception("Launcher busy") - - self._queued[target].append(job_name) - - log.verbose( - "[%s]: [%s]: [reqeued]: %s", - hms, - target, - job_name, - ) - continue - - self._running[target].append(job_name) - self.job_status[job_name] = "D" + self._dispatch_job(hms, target, job_name) def _kill(self) -> None: """Kill any running items and cancel any that are waiting.""" @@ -680,7 +696,7 @@ def _cancel_item(self, job_name: str, *, cancel_successors: bool = True) -> None """ target = self._jobs[job_name].target - self.job_status[job_name] = "K" + self.job_status[job_name] = JobStatus.KILLED self._killed[target].add(job_name) if job_name in self._queued[target]: self._queued[target].remove(job_name) @@ -699,7 +715,7 @@ def _kill_item(self, job_name: str) -> None: """ target = self._jobs[job_name].target self._launchers[job_name].kill() - self.job_status[job_name] = "K" + self.job_status[job_name] = JobStatus.KILLED self._killed[target].add(job_name) self._running[target].remove(job_name) self._cancel_successors(job_name) diff --git a/src/dvsim/sim/flow.py b/src/dvsim/sim/flow.py index d9e3f86c..c5fd3a06 100644 --- a/src/dvsim/sim/flow.py +++ b/src/dvsim/sim/flow.py @@ -22,6 +22,7 @@ CovUnr, RunTest, ) +from dvsim.job.status import JobStatus from dvsim.logging import log from dvsim.modes import BuildMode, Mode, RunMode, find_mode from dvsim.regression import Regression @@ -214,10 +215,10 @@ def _expand(self) -> None: # Set directories with links for ease of debug / triage. self.links = { - "D": self.scratch_path + "/" + "dispatched", - "P": self.scratch_path + "/" + "passed", - "F": self.scratch_path + "/" + "failed", - "K": self.scratch_path + "/" + "killed", + JobStatus.DISPATCHED: self.scratch_path + "/" + "dispatched", + JobStatus.PASSED: self.scratch_path + "/" + "passed", + JobStatus.FAILED: self.scratch_path + "/" + "failed", + JobStatus.KILLED: self.scratch_path + "/" + "killed", } # Use the default build mode for tests that do not specify it diff --git a/src/dvsim/sim_results.py b/src/dvsim/sim_results.py index c9a3c6ac..9c753482 100644 --- a/src/dvsim/sim_results.py +++ b/src/dvsim/sim_results.py @@ -10,6 +10,7 @@ from pydantic import BaseModel, ConfigDict +from dvsim.job.status import JobStatus from dvsim.testplan import Result if TYPE_CHECKING: @@ -124,7 +125,7 @@ def from_job_status(results: Sequence["CompletedJobStatus"]) -> "BucketedFailure buckets = {} for job_status in results: - if job_status.status in ["F", "K"]: + if job_status.status in (JobStatus.FAILED, JobStatus.KILLED): bucket = _bucketize(job_status.fail_msg.message) if bucket not in buckets: @@ -186,6 +187,6 @@ def _add_run(self, job_status: "CompletedJobStatus") -> None: row.job_runtime = job_status.job_runtime row.simulated_time = job_status.simulated_time - if job_status.status == "P": + if job_status.status == JobStatus.PASSED: row.passing += 1 row.total += 1