From 41ccfdb533ac206a17c0ba31bac244da048af8c9 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 29 Jan 2026 14:05:35 +0000 Subject: [PATCH 1/6] refactor: replace assert with RuntimeError Address the `S101` failure with ruff check. Signed-off-by: Alex Jones --- src/dvsim/scheduler.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index c2519799..8d3596bc 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -375,9 +375,10 @@ def _get_successors(self, job_name: str | None = None) -> Sequence[str]: for next_item in self._scheduled[target][cfg]: if job_name is not None: job = self._jobs[next_item] - # Something is terribly wrong if item exists but the - # next_item's dependency list is empty. - assert job.dependencies + if not job.dependencies: + raise RuntimeError( + "Job item exists but the next item's dependency list is empty?" + ) if job_name not in job.dependencies: continue From 232b6f0682e6c56a76f60cb8835201372b37315e Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 29 Jan 2026 17:27:07 +0000 Subject: [PATCH 2/6] refactor: resolve scheduler complexity lint warnings Break apart a couple of the more complex functions to resolve ruff lint warnings related to method complexity. Also take the opportunity to slightly improve some of the documentation on these functions and to refactor some of the code to be nicer to read. Signed-off-by: Alex Jones --- src/dvsim/scheduler.py | 127 +++++++++++++++++++++++------------------ 1 file changed, 71 insertions(+), 56 deletions(-) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 8d3596bc..3932eddf 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -13,6 +13,7 @@ MutableSet, Sequence, ) +from itertools import dropwhile from signal import SIGINT, SIGTERM, signal from types import FrameType from typing import TYPE_CHECKING, Any @@ -318,13 +319,34 @@ def _cancel_successors(self, job_name: str) -> None: self._cancel_item(next_item, cancel_successors=False) items.extend(self._get_successors(next_item)) + def _get_successor_target(self, job_name: str) -> str | None: + """Find the first target in the scheduled list that follows the target of a given job. + + Args: + job_name: name of the job (to find the successor target of). + + Returns: + the successor, or None if no such target exists or there is no successor. + + """ + job: JobSpec = self._jobs[job_name] + + if job.target not in self._scheduled: + msg = f"Scheduler does not contain target {job.target}" + raise KeyError(msg) + + # Find the first target that follows the target in the scheduled list. + target_iter = dropwhile(lambda x: x != job.target, self._scheduled) + next(target_iter, None) + return next(target_iter, None) + def _get_successors(self, job_name: str | None = None) -> Sequence[str]: """Find immediate successors of an item. - We choose the target that follows the 'item''s current target and find - the list of successors whose dependency list contains 'item'. If 'item' - is None, we pick successors from all cfgs, else we pick successors only - from the cfg to which the item belongs. + We choose the target that follows the item's current target and find + the list of successors whose dependency list contains "job_name". If + "job_name" is None, we pick successors from all cfgs, else we pick + successors only from the cfg to which the item belongs. Args: job_name: name of the job @@ -334,39 +356,16 @@ def _get_successors(self, job_name: str | None = None) -> Sequence[str]: """ if job_name is None: - target = next(iter(self._scheduled)) - - if target is None: - return [] - - cfgs = set(self._scheduled[target]) - + target = next(iter(self._scheduled), None) + cfgs = set() if target is None else set(self._scheduled[target]) else: + target = self._get_successor_target(job_name) job: JobSpec = self._jobs[job_name] - - if job.target not in self._scheduled: - msg = f"Scheduler does not contain target {job.target}" - raise KeyError(msg) - - target_iterator = iter(self._scheduled) - target = next(target_iterator) - - found = False - while not found: - if target == job.target: - found = True - - try: - target = next(target_iterator) - - except StopIteration: - return [] - - if target is None: - return [] - cfgs = {job.block.name} + if target is None: + return () + # Find item's successors that can be enqueued. We assume here that # only the immediately succeeding target can be enqueued at this # time. @@ -525,8 +524,45 @@ def _poll(self, hms: str) -> bool: return changed + def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: + """Dispatch the named queued job. + + Args: + hms: time as a string formatted in hh:mm:ss + target: the target to dispatch this job to + job_name: the name of the job to dispatch + + """ + try: + self._launchers[job_name].launch() + + except LauncherError: + log.exception("Error launching %s", job_name) + self._kill_item(job_name) + + except LauncherBusyError: + log.exception("Launcher busy") + + self._queued[target].append(job_name) + + log.verbose( + "[%s]: [%s]: [reqeued]: %s", + hms, + target, + job_name, + ) + return + + self._running[target].append(job_name) + self.job_status[job_name] = "D" + def _dispatch(self, hms: str) -> None: - """Dispatch some queued items if possible.""" + """Dispatch some queued items if possible. + + Args: + hms: time as a string formatted in hh:mm:ss + + """ slots = self._launcher_cls.max_parallel - total_sub_items(self._running) if slots <= 0: return @@ -590,28 +626,7 @@ def _dispatch(self, hms: str) -> None: ) for job_name in to_dispatch: - try: - self._launchers[job_name].launch() - - except LauncherError: - log.exception("Error launching %s", job_name) - self._kill_item(job_name) - - except LauncherBusyError: - log.exception("Launcher busy") - - self._queued[target].append(job_name) - - log.verbose( - "[%s]: [%s]: [reqeued]: %s", - hms, - target, - job_name, - ) - continue - - self._running[target].append(job_name) - self.job_status[job_name] = "D" + self._dispatch_job(hms, target, job_name) def _kill(self) -> None: """Kill any running items and cancel any that are waiting.""" From 18d5131e7d419e6364c389d6e07374ca5a2b2887 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 29 Jan 2026 17:53:27 +0000 Subject: [PATCH 3/6] fix: resolve JobSpec/str type error There may still be more work needed on the typing front but this resolves the singular remaining type error given by pyright within the scheduler. Signed-off-by: Alex Jones --- src/dvsim/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 3932eddf..0ce623b3 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -29,7 +29,7 @@ def total_sub_items( - d: Mapping[str, Sequence[JobSpec]] | Mapping["FlowCfg", Sequence[JobSpec]], + d: Mapping[str, Sequence[str]] | Mapping["FlowCfg", Sequence[JobSpec]], ) -> int: """Return the total number of sub items in a mapping. From 6b260bbc8c2cd7fe70bfddc2219a4d2caa724b5e Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 5 Feb 2026 17:06:02 +0000 Subject: [PATCH 4/6] fix: Replace 'E' local launcher poll status with a LauncherError This 'E' error launcher status is not well-defined or well-known within the rest of DVSim, but can be seen as meaningfully different from the 'F'/'FAILED' status which describes the dispatched job failing, as this denotes a failure within the launcher. As DVSim currently does not handle this status (almost at-all!) and this is the single occurrence of the 'E' status in the code, we instead convert this into a more appropriate raised `LauncherError`, as in reality this case only happens when calling `poll()` either before `launch()` or after `launch()` already reported some error. Signed-off-by: Alex Jones --- src/dvsim/launcher/local.py | 6 +++++- src/dvsim/scheduler.py | 8 ++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index d0f2c14c..48848405 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -113,7 +113,11 @@ def poll(self) -> str | None: must not be called again once it has returned 'P' or 'F'. """ if self._process is None: - return "E" + msg = ( + "poll() was called either before calling launch() or after " + "ignoring a LauncherError from launch()." + ) + raise LauncherError(msg) elapsed_time = datetime.datetime.now() - self.start_time self.job_runtime_secs = elapsed_time.total_seconds() diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 0ce623b3..effb0a9c 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -477,7 +477,11 @@ def _poll(self, hms: str) -> bool: self._running[target], self._last_item_polled_idx[target], ) - status = self._launchers[job_name].poll() + try: + status = self._launchers[job_name].poll() + except LauncherError as e: + log.error("Error when dispatching target: %s", str(e)) + status = "K" level = log.VERBOSE if status not in ["D", "P", "F", "E", "K"]: @@ -495,7 +499,7 @@ def _poll(self, hms: str) -> bool: level = log.ERROR else: - # Killed or Error dispatching + # Killed, still Queued, or some error when dispatching. self._killed[target].add(job_name) level = log.ERROR From 403977e3a763955f5897618cbc853b81b293c18a Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 30 Jan 2026 12:21:15 +0000 Subject: [PATCH 5/6] refactor: make Job Status an Enum Currently the job status is passed around DVSim as a string, where the expectation is that it is always a single upper case character in ('Q', 'D', 'P', 'F', 'K') - some limited parts of the code also handle an 'E' error case. Turn these into a small Enum type so that we have much stricter typing on these values and improved ergonomics when referring to different job statuses in code. This status Enum lives in its own module instead of with the other job data to avoid circular imports (as the job status' registered callbacks reference the launcher definition). Signed-off-by: Alex Jones --- src/dvsim/flow/formal.py | 5 ++-- src/dvsim/flow/one_shot.py | 9 +++--- src/dvsim/job/data.py | 9 +++--- src/dvsim/job/deploy.py | 17 ++++++----- src/dvsim/job/status.py | 29 ++++++++++++++++++ src/dvsim/launcher/base.py | 47 +++++++++++++++--------------- src/dvsim/launcher/fake.py | 15 +++++----- src/dvsim/launcher/local.py | 21 ++++++------- src/dvsim/launcher/lsf.py | 29 +++++++++--------- src/dvsim/launcher/nc.py | 24 ++++++++------- src/dvsim/launcher/sge/launcher.py | 18 +++++++----- src/dvsim/launcher/slurm.py | 16 ++++++---- src/dvsim/scheduler.py | 42 +++++++++++++------------- src/dvsim/sim/flow.py | 9 +++--- src/dvsim/sim_results.py | 5 ++-- 15 files changed, 172 insertions(+), 123 deletions(-) create mode 100644 src/dvsim/job/status.py diff --git a/src/dvsim/flow/formal.py b/src/dvsim/flow/formal.py index 8f196e16..b7bfb70f 100644 --- a/src/dvsim/flow/formal.py +++ b/src/dvsim/flow/formal.py @@ -10,6 +10,7 @@ from dvsim.flow.one_shot import OneShotCfg from dvsim.job.data import CompletedJobStatus +from dvsim.job.status import JobStatus from dvsim.logging import log from dvsim.utils import subst_wildcards @@ -230,7 +231,7 @@ def _gen_results(self, results: Sequence[CompletedJobStatus]) -> None: assert len(self.deploy) == 1 mode = self.deploy[0] - if complete_job.status == "P": + if complete_job.status == JobStatus.PASSED: result_data = Path( subst_wildcards(self.build_dir, {"build_mode": mode.name}), "results.hjson", @@ -262,7 +263,7 @@ def _gen_results(self, results: Sequence[CompletedJobStatus]) -> None: else: summary += ["N/A", "N/A", "N/A"] - if complete_job.status != "P": + if complete_job.status != JobStatus.PASSED: results_str += "\n## List of Failures\n" + "".join(complete_job.fail_msg.message) messages = self.result.get("messages") diff --git a/src/dvsim/flow/one_shot.py b/src/dvsim/flow/one_shot.py index 486015c4..b8d86dd0 100644 --- a/src/dvsim/flow/one_shot.py +++ b/src/dvsim/flow/one_shot.py @@ -12,6 +12,7 @@ from dvsim.flow.base import FlowCfg from dvsim.job.data import CompletedJobStatus from dvsim.job.deploy import CompileOneShot +from dvsim.job.status import JobStatus from dvsim.logging import log from dvsim.modes import BuildMode, Mode from dvsim.utils import rm_path @@ -92,10 +93,10 @@ def _expand(self) -> None: # Set directories with links for ease of debug / triage. self.links = { - "D": self.scratch_path + "/" + "dispatched", - "P": self.scratch_path + "/" + "passed", - "F": self.scratch_path + "/" + "failed", - "K": self.scratch_path + "/" + "killed", + JobStatus.DISPATCHED: self.scratch_path + "/" + "dispatched", + JobStatus.PASSED: self.scratch_path + "/" + "passed", + JobStatus.FAILED: self.scratch_path + "/" + "failed", + JobStatus.KILLED: self.scratch_path + "/" + "killed", } # Use the default build mode for tests that do not specify it diff --git a/src/dvsim/job/data.py b/src/dvsim/job/data.py index cd3b42c5..5dda0622 100644 --- a/src/dvsim/job/data.py +++ b/src/dvsim/job/data.py @@ -14,6 +14,7 @@ from pydantic import BaseModel, ConfigDict +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher from dvsim.report.data import IPMeta, ToolMeta @@ -97,13 +98,13 @@ class JobSpec(BaseModel): """Output directory for the job results files.""" log_path: Path """Path for the job log file.""" - links: Mapping[str, Path] + links: Mapping[JobStatus, Path] """Path for links directories.""" # TODO: remove the need for these callables here pre_launch: Callable[[Launcher], None] """Callback function for pre-launch actions.""" - post_finish: Callable[[str], None] + post_finish: Callable[[JobStatus], None] """Callback function for tidy up actions once the job is finished.""" pass_patterns: Sequence[str] @@ -153,7 +154,7 @@ class CompletedJobStatus(BaseModel): simulated_time: float """Simulation time.""" - status: str - """Job status string [P,F,K,...]""" + status: JobStatus + """Status of the job.""" fail_msg: ErrorMessage """Error message.""" diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 9a55d8bf..f91cfbd7 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -14,6 +14,7 @@ from tabulate import tabulate from dvsim.job.data import JobSpec, WorkspaceConfig +from dvsim.job.status import JobStatus from dvsim.job.time import JobTime from dvsim.launcher.base import Launcher from dvsim.logging import log @@ -347,10 +348,10 @@ def callback(launcher: Launcher) -> None: return callback - def post_finish(self) -> Callable[[str], None]: + def post_finish(self) -> Callable[[JobStatus], None]: """Get post finish callback.""" - def callback(status: str) -> None: + def callback(status: JobStatus) -> None: """Perform additional post-finish activities (callback). This is invoked by launcher::_post_finish(). @@ -641,12 +642,12 @@ def callback(launcher: Launcher) -> None: return callback - def post_finish(self) -> Callable[[str], None]: + def post_finish(self) -> Callable[[JobStatus], None]: """Get post finish callback.""" - def callback(status: str) -> None: + def callback(status: JobStatus) -> None: """Perform tidy up tasks.""" - if status != "P": + if status != JobStatus.PASSED: # Delete the coverage data if available. rm_path(self.cov_db_test_dir) @@ -812,16 +813,16 @@ def _set_attrs(self) -> None: self.cov_results = "" self.cov_results_dict = {} - def post_finish(self) -> Callable[[str], None]: + def post_finish(self) -> Callable[[JobStatus], None]: """Get post finish callback.""" - def callback(status: str) -> None: + def callback(status: JobStatus) -> None: """Extract the coverage results summary for the dashboard. If the extraction fails, an appropriate exception is raised, which must be caught by the caller to mark the job as a failure. """ - if self.dry_run or status != "P": + if self.dry_run or status != JobStatus.PASSED: return plugin = get_sim_tool_plugin(tool=self.sim_cfg.tool) diff --git a/src/dvsim/job/status.py b/src/dvsim/job/status.py new file mode 100644 index 00000000..55dabe39 --- /dev/null +++ b/src/dvsim/job/status.py @@ -0,0 +1,29 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""An enum definition for the various job statuses.""" + +from enum import Enum + +__all__ = ("JobStatus",) + + +class JobStatus(Enum): + """Status of a Job.""" + + QUEUED = 0 + DISPATCHED = 1 + PASSED = 2 + FAILED = 3 + KILLED = 4 + + @property + def shorthand(self) -> str: + """Shorthand for the job status, e.g. 'D' for 'Dispatched'.""" + return self.name[0] + + @property + def ended(self) -> bool: + """Whether this status corresponds to some ended job.""" + return self in (JobStatus.PASSED, JobStatus.FAILED, JobStatus.KILLED) diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index f361591c..2c15f0d8 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -15,6 +15,7 @@ from pydantic import BaseModel, ConfigDict +from dvsim.job.status import JobStatus from dvsim.job.time import JobTime from dvsim.logging import log from dvsim.tool.utils import get_sim_tool_plugin @@ -210,18 +211,18 @@ def _make_odir(self) -> None: Path(self.job_spec.odir).mkdir(exist_ok=True, parents=True) - def _link_odir(self, status: str) -> None: + def _link_odir(self, status: JobStatus) -> None: """Soft-links the job's directory based on job's status. - The dispatched, passed and failed directories in the scratch area + The DISPATCHED, PASSED and FAILED directories in the scratch area provide a quick way to get to the job that was executed. """ dest = Path(self.job_spec.links[status], self.job_spec.qual_name) mk_symlink(path=self.job_spec.odir, link=dest) # Delete the symlink from dispatched directory if it exists. - if status != "D": - old = Path(self.job_spec.links["D"], self.job_spec.qual_name) + if status != JobStatus.DISPATCHED: + old = Path(self.job_spec.links[JobStatus.DISPATCHED], self.job_spec.qual_name) rm_path(old) def _dump_env_vars(self, exports: Mapping[str, str]) -> None: @@ -258,7 +259,7 @@ def launch(self) -> None: self._do_launch() @abstractmethod - def poll(self) -> str | None: + def poll(self) -> JobStatus | None: """Poll the launched job for completion. Invokes _check_status() and _post_finish() when the job completes. @@ -272,14 +273,14 @@ def poll(self) -> str | None: def kill(self) -> None: """Terminate the job.""" - def _check_status(self) -> tuple[str, ErrorMessage | None]: - """Determine the outcome of the job (P/F if it ran to completion). + def _check_status(self) -> tuple[JobStatus, ErrorMessage | None]: + """Determine the outcome of the job (PASSED/FAILED if it ran to completion). Returns: (status, err_msg) extracted from the log, where the status is - "P" if the it passed, "F" otherwise. This is invoked by poll() just - after the job finishes. err_msg is an instance of the named tuple - ErrorMessage. + PASSED if the job passed, FAILED otherwise. This is invoked by + poll() just after the job finishes. err_msg is an instance of the + named tuple ErrorMessage. """ @@ -307,7 +308,7 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: return None if self.job_spec.dry_run: - return "P", None + return JobStatus.PASSED, None # Only one fail pattern needs to be seen. chk_failed = bool(self.job_spec.fail_patterns) @@ -324,7 +325,7 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: ) as f: lines = f.readlines() except OSError as e: - return "F", ErrorMessage( + return JobStatus.FAILED, ErrorMessage( line_number=None, message=f"Error opening file {self.job_spec.log_path}:\n{e}", context=[], @@ -368,7 +369,7 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: # If failed, then nothing else to do. Just return. # Provide some extra lines for context. end = cnt + 5 - return "F", ErrorMessage( + return JobStatus.FAILED, ErrorMessage( line_number=cnt + 1, message=line.strip(), context=lines[cnt:end], @@ -384,32 +385,32 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: # exit code for whatever reason, then show the last 10 lines of the log # as the failure message, which might help with the debug. if self.exit_code != 0: - return "F", ErrorMessage( + return JobStatus.FAILED, ErrorMessage( line_number=None, message="Job returned non-zero exit code", context=lines[-10:], ) if chk_passed: - return "F", ErrorMessage( + return JobStatus.FAILED, ErrorMessage( line_number=None, message=f"Some pass patterns missing: {pass_patterns}", context=lines[-10:], ) - return "P", None + return JobStatus.PASSED, None - def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: + def _post_finish(self, status: JobStatus, err_msg: ErrorMessage) -> None: """Do post-completion activities, such as preparing the results. Must be invoked by poll(), after the job outcome is determined. Args: - status: status of the job, either 'P', 'F' or 'K'. + status: status of the completed job (must be either PASSED, FAILED or KILLED). err_msg: an instance of the named tuple ErrorMessage. """ - assert status in ["P", "F", "K"] + assert status.ended self._link_odir(status) - log.debug("Item %s has completed execution: %s", self, status) + log.debug("Item %s has completed execution: %s", self, status.shorthand) try: # Run the target-specific cleanup tasks regardless of the job's @@ -419,8 +420,8 @@ def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: except Exception as e: # If the job had already failed, then don't do anything. If it's # cleanup task failed, then mark the job as failed. - if status == "P": - status = "F" + if status == JobStatus.PASSED: + status = JobStatus.FAILED err_msg = ErrorMessage( line_number=None, message=f"{e}", @@ -428,7 +429,7 @@ def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: ) self.status = status - if self.status != "P": + if self.status != JobStatus.PASSED: assert err_msg assert isinstance(err_msg, ErrorMessage) self.fail_msg = err_msg diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py index 066b31df..7053c0db 100644 --- a/src/dvsim/launcher/fake.py +++ b/src/dvsim/launcher/fake.py @@ -7,6 +7,7 @@ from random import choice from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher if TYPE_CHECKING: @@ -16,12 +17,12 @@ __all__ = ("FakeLauncher",) -def _run_test_handler(job_spec: "JobSpec") -> str: +def _run_test_handler(job_spec: "JobSpec") -> JobStatus: """Handle a RunTest deploy job.""" - return choice(("P", "F")) + return choice((JobStatus.PASSED, JobStatus.FAILED)) -def _cov_report_handler(job_spec: "JobSpec") -> str: +def _cov_report_handler(job_spec: "JobSpec") -> JobStatus: """Handle a CompileSim deploy job.""" # TODO: this hack doesn't work any more and needs implementing by writing # a file that can be parsed as if it's been generated by the tool. @@ -38,7 +39,7 @@ def _cov_report_handler(job_spec: "JobSpec") -> str: # ] # job_spec.cov_results_dict = {k: f"{random() * 100:.2f} %" for k in keys} - return "P" + return JobStatus.PASSED _DEPLOY_HANDLER = { @@ -56,19 +57,19 @@ class FakeLauncher(Launcher): def _do_launch(self) -> None: """Do the launch.""" - def poll(self) -> str | None: + def poll(self) -> JobStatus | None: """Check status of the running process.""" deploy_cls = self.job_spec.job_type if deploy_cls in _DEPLOY_HANDLER: return _DEPLOY_HANDLER[deploy_cls](job_spec=self.job_spec) # Default result is Pass - return "P" + return JobStatus.PASSED def kill(self) -> None: """Kill the running process.""" self._post_finish( - "K", + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]), ) diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index 48848405..74c07eaa 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -10,6 +10,7 @@ import subprocess from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError if TYPE_CHECKING: @@ -100,17 +101,17 @@ def _do_launch(self) -> None: # Wait until the process exit self._process.wait() - self._link_odir("D") + self._link_odir(JobStatus.DISPATCHED) - def poll(self) -> str | None: + def poll(self) -> JobStatus | None: """Check status of the running process. - This returns 'D', 'P', 'F', or 'K'. If 'D', the job is still running. - If 'P', the job finished successfully. If 'F', the job finished with - an error. If 'K' it was killed. + This returns a job status. If DISPATCHED, the job is still running. + If PASSED, the job finished successfully. If FAILED, the job finished + with an error. If KILLED, it was killed. This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned 'P' or 'F'. + must not be called again once it has returned PASSED or FAILED. """ if self._process is None: msg = ( @@ -131,16 +132,16 @@ def poll(self) -> str | None: timeout_mins = self.job_spec.timeout_mins timeout_message = f"Job timed out after {timeout_mins} minutes" self._post_finish( - "K", + JobStatus.KILLED, ErrorMessage( line_number=None, message=timeout_message, context=[timeout_message], ), ) - return "K" + return JobStatus.KILLED - return "D" + return JobStatus.DISPATCHED self.exit_code = self._process.returncode status, err_msg = self._check_status() @@ -172,7 +173,7 @@ def kill(self) -> None: """ self._kill() self._post_finish( - "K", + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]), ) diff --git a/src/dvsim/launcher/lsf.py b/src/dvsim/launcher/lsf.py index f177cbc6..167b847e 100644 --- a/src/dvsim/launcher/lsf.py +++ b/src/dvsim/launcher/lsf.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import TYPE_CHECKING, ClassVar +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log from dvsim.utils import clean_odirs @@ -270,9 +271,9 @@ def _do_launch(self) -> None: for job in LsfLauncher.jobs[cfg][job_name]: job.bsub_out = Path(f"{job_script}.{job.index}.out") job.job_id = f"{job_id}[{job.index}]" - job._link_odir("D") + job._link_odir(JobStatus.DISPATCHED) - def poll(self) -> str | None: + def poll(self) -> JobStatus | None: """Poll the status of the job. Returns: @@ -286,7 +287,7 @@ def poll(self) -> str | None: if not self.bsub_out_fd: # If job id is not set, the bsub command has not been sent yet. if not self.job_id: - return "D" + return JobStatus.DISPATCHED # We redirect the job's output to the log file, so the job script # output remains empty until the point it finishes. This is a very @@ -295,10 +296,10 @@ def poll(self) -> str | None: # created), then the job is still running. try: if not self.bsub_out.stat().st_size: - return "D" + return JobStatus.DISPATCHED except FileNotFoundError: - return "D" + return JobStatus.DISPATCHED # If we got to this point, we can now open the job script output # file for reading. @@ -307,14 +308,14 @@ def poll(self) -> str | None: except OSError as e: self._post_finish( - "F", + JobStatus.FAILED, ErrorMessage( line_number=None, message=f"ERROR: Failed to open {self.bsub_out}\n{e}.", context=[e], ), ) - return "F" + return JobStatus.FAILED # Now that the job has completed, we need to determine its status. # @@ -360,7 +361,7 @@ def poll(self) -> str | None: # Fail the test if we have reached the max polling retries. if self.num_poll_retries == LsfLauncher.max_poll_retries: self._post_finish( - "F", + JobStatus.FAILED, ErrorMessage( message="ERROR: Reached max retries while " f"reading job script output {self.bsub_out} to determine" @@ -368,9 +369,9 @@ def poll(self) -> str | None: context=[], ), ) - return "F" + return JobStatus.FAILED - return "D" + return JobStatus.DISPATCHED def _get_job_exit_code(self) -> int | None: """Read the job script output to retrieve the exit code. @@ -437,14 +438,14 @@ def kill(self) -> None: else: log.error("Job ID for %s not found", self.job_spec.full_name) - self._post_finish("K", ErrorMessage(message="Job killed!", context=[])) + self._post_finish(JobStatus.KILLED, ErrorMessage(message="Job killed!", context=[])) def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: """Tidy up after the job is complete.""" if self.bsub_out_fd: self.bsub_out_fd.close() if self.exit_code is None: - self.exit_code = 0 if status == "P" else 1 + self.exit_code = 0 if status == JobStatus.PASSED else 1 super()._post_finish(status, err_msg) @staticmethod @@ -453,7 +454,7 @@ def _post_finish_job_array( job_name: str, err_msg: str, ) -> None: - """On LSF error, mark all jobs in this array as killed. + """On LSF error, mark all jobs in this array as FAILED. Args: cfg: workspace configuration @@ -463,6 +464,6 @@ def _post_finish_job_array( """ for job in LsfLauncher.jobs[cfg.project][job_name]: job._post_finish( - "F", + JobStatus.FAILED, ErrorMessage(line_number=None, message=err_msg, context=[err_msg]), ) diff --git a/src/dvsim/launcher/nc.py b/src/dvsim/launcher/nc.py index c4a41496..6f1d7a8f 100644 --- a/src/dvsim/launcher/nc.py +++ b/src/dvsim/launcher/nc.py @@ -11,6 +11,7 @@ import sys from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log from dvsim.utils import rm_path @@ -162,20 +163,20 @@ def _do_launch(self) -> None: finally: self._close_process() - self._link_odir("D") + self._link_odir(JobStatus.DISPATCHED) def minutes_since_start(self): return (datetime.datetime.now() - self.start_time).total_seconds() / 60 - def poll(self): + def poll(self) -> JobStatus | None: """Check status of the running process. - This returns 'D', 'P', 'F', or 'K'. If 'D', the job is still running. - If 'P', the job finished successfully. If 'F', the job finished with - an error. If 'K' it was killed. + This returns a job status. If DISPATCHED, the job is still running. + If PASSED, the job finished successfully. If FAILED, the job finished + with an error. If KILLED, it was killed. This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned 'P' or 'F'. + must not be called again once it has returned PASSED or FAILED. """ assert self.process is not None if self.process.poll() is None: @@ -215,16 +216,15 @@ def poll(self): elif self.nc_job_state == "wait_timeout": timeout_message = f"Job timed out after waiting {wait_timeout_mins} mins" self._post_finish( - "K", + JobStatus.KILLED, ErrorMessage( line_number=None, message=timeout_message, context=[timeout_message], ), ) - return "K" - return "D" - return "D" + return JobStatus.KILLED + return JobStatus.DISPATCHED self.exit_code = self.process.returncode status, err_msg = self._check_status() @@ -255,7 +255,9 @@ def kill(self) -> None: """ self._kill() - self._post_finish("K", ErrorMessage(line_number=None, message="Job killed!", context=[])) + self._post_finish( + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]) + ) def _post_finish(self, status, err_msg) -> None: self._close_process() diff --git a/src/dvsim/launcher/sge/launcher.py b/src/dvsim/launcher/sge/launcher.py index 2eab0228..8559788b 100644 --- a/src/dvsim/launcher/sge/launcher.py +++ b/src/dvsim/launcher/sge/launcher.py @@ -11,6 +11,7 @@ from subprocess import PIPE, Popen from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.launcher.sge.engine import * # noqa: F403 @@ -93,21 +94,22 @@ def _do_launch(self) -> None: finally: self._close_process() - self._link_odir("D") + self._link_odir(JobStatus.DISPATCHED) f.close() - def poll(self) -> str: + def poll(self) -> JobStatus | None: """Check status of the running process. - This returns 'D', 'P' or 'F'. If 'D', the job is still running. If 'P', - the job finished successfully. If 'F', the job finished with an error. + This returns a job status. If DISPATCHED, the job is still running. + If PASSED, the job finished successfully. If FAILED, the job finished + with an error. If KILLED, it was killed. This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned 'P' or 'F'. + must not be called again once it has returned PASSED or FAILED. """ assert self.process is not None if self.process.poll() is None: - return "D" + return JobStatus.DISPATCHED # ------------------------------------- # copy SGE job results to log file sge_log_path = Path(f"{self.job_spec.log_path}.sge") @@ -157,7 +159,9 @@ def kill(self) -> None: output = process.communicate()[0].decode("utf-8") output = output.rstrip("\n") # ---------------------------- - self._post_finish("K", ErrorMessage(line_number=None, message="Job killed!", context=[])) + self._post_finish( + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]) + ) def _post_finish(self, status, err_msg) -> None: super()._post_finish(status, err_msg) diff --git a/src/dvsim/launcher/slurm.py b/src/dvsim/launcher/slurm.py index deab4bc5..7e966ae7 100644 --- a/src/dvsim/launcher/slurm.py +++ b/src/dvsim/launcher/slurm.py @@ -9,6 +9,7 @@ import subprocess from typing import TYPE_CHECKING +from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log @@ -85,20 +86,21 @@ def _do_launch(self) -> None: finally: self._close_process() - self._link_odir("D") + self._link_odir(JobStatus.DISPATCHED) def poll(self): """Check status of the running process. - This returns 'D', 'P' or 'F'. If 'D', the job is still running. If 'P', - the job finished successfully. If 'F', the job finished with an error. + This returns a job status. If DISPATCHED, the job is still running. + If PASSED, the job finished successfully. If FAILED, the job finished + with an error. If KILLED, it was killed. This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned 'P' or 'F'. + must not be called again once it has returned PASSED or FAILED. """ assert self.process is not None if self.process.poll() is None: - return "D" + return JobStatus.DISPATCHED # Copy slurm job results to log file if pathlib.Path(self.slurm_log_file).exists(): @@ -138,7 +140,9 @@ def kill(self) -> None: self.process.wait(timeout=2) except subprocess.TimeoutExpired: self.process.kill() - self._post_finish("K", ErrorMessage(line_number=None, message="Job killed!", context=[])) + self._post_finish( + JobStatus.KILLED, ErrorMessage(line_number=None, message="Job killed!", context=[]) + ) def _post_finish(self, status, err_msg) -> None: super()._post_finish(status, err_msg) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index effb0a9c..28de4851 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any from dvsim.job.data import CompletedJobStatus, JobSpec +from dvsim.job.status import JobStatus from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError from dvsim.logging import log from dvsim.utils.status_printer import get_status_printer @@ -152,10 +153,9 @@ def __init__( self._status_printer.init_target(target=target, msg=msg) # A map from the job names tracked by this class to their - # current status. This status is 'Q', 'D', 'P', 'F' or 'K', - # corresponding to membership in the dicts above. This is not - # per-target. - self.job_status: MutableMapping[str, str] = {} + # current status, corresponding to membership in the dicts + # above. This is not per-target. + self.job_status: MutableMapping[str, JobStatus] = {} # Create the launcher instance for all items. self._launchers: Mapping[str, Launcher] = { @@ -300,7 +300,7 @@ def _enqueue_successors(self, job_name: str | None = None) -> None: msg = f"Job {next_job_name} already scheduled" raise RuntimeError(msg) - self.job_status[next_job_name] = "Q" + self.job_status[next_job_name] = JobStatus.QUEUED self._queued[target].append(next_job_name) self._unschedule_item(next_job_name) @@ -406,7 +406,7 @@ def _ok_to_enqueue(self, job_name: str) -> bool: return False # Has the dep completed? - if self.job_status[dep] not in ["P", "F", "K"]: + if not self.job_status[dep].ended: return False return True @@ -434,14 +434,14 @@ def _ok_to_run(self, job_name: str) -> bool: continue dep_status = self.job_status[dep_name] - if dep_status not in ["P", "F", "K"]: - raise ValueError("Status must be one of P, F, or K") + if not dep_status.ended: + msg = f"Expected dependent job {dep_name} to be ended, not {dep_status.name}." + raise ValueError(msg) if job.needs_all_dependencies_passing: - if dep_status in ["F", "K"]: + if dep_status != JobStatus.PASSED: return False - - elif dep_status in ["P"]: + elif dep_status == JobStatus.PASSED: return True return job.needs_all_dependencies_passing @@ -481,20 +481,20 @@ def _poll(self, hms: str) -> bool: status = self._launchers[job_name].poll() except LauncherError as e: log.error("Error when dispatching target: %s", str(e)) - status = "K" + status = JobStatus.KILLED level = log.VERBOSE - if status not in ["D", "P", "F", "E", "K"]: - msg = f"Status must be one of D, P, F, E or K but found {status}" + if status is None: + msg = "Expected a valid job status but got None instead." raise ValueError(msg) - if status == "D": + if status == JobStatus.DISPATCHED: continue - if status == "P": + if status == JobStatus.PASSED: self._passed[target].add(job_name) - elif status == "F": + elif status == JobStatus.FAILED: self._failed[target].add(job_name) level = log.ERROR @@ -513,7 +513,7 @@ def _poll(self, hms: str) -> bool: hms, target, job_name, - status, + status.shorthand, ) # Enqueue item's successors regardless of its status. @@ -558,7 +558,7 @@ def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: return self._running[target].append(job_name) - self.job_status[job_name] = "D" + self.job_status[job_name] = JobStatus.DISPATCHED def _dispatch(self, hms: str) -> None: """Dispatch some queued items if possible. @@ -700,7 +700,7 @@ def _cancel_item(self, job_name: str, *, cancel_successors: bool = True) -> None """ target = self._jobs[job_name].target - self.job_status[job_name] = "K" + self.job_status[job_name] = JobStatus.KILLED self._killed[target].add(job_name) if job_name in self._queued[target]: self._queued[target].remove(job_name) @@ -719,7 +719,7 @@ def _kill_item(self, job_name: str) -> None: """ target = self._jobs[job_name].target self._launchers[job_name].kill() - self.job_status[job_name] = "K" + self.job_status[job_name] = JobStatus.KILLED self._killed[target].add(job_name) self._running[target].remove(job_name) self._cancel_successors(job_name) diff --git a/src/dvsim/sim/flow.py b/src/dvsim/sim/flow.py index d9e3f86c..c5fd3a06 100644 --- a/src/dvsim/sim/flow.py +++ b/src/dvsim/sim/flow.py @@ -22,6 +22,7 @@ CovUnr, RunTest, ) +from dvsim.job.status import JobStatus from dvsim.logging import log from dvsim.modes import BuildMode, Mode, RunMode, find_mode from dvsim.regression import Regression @@ -214,10 +215,10 @@ def _expand(self) -> None: # Set directories with links for ease of debug / triage. self.links = { - "D": self.scratch_path + "/" + "dispatched", - "P": self.scratch_path + "/" + "passed", - "F": self.scratch_path + "/" + "failed", - "K": self.scratch_path + "/" + "killed", + JobStatus.DISPATCHED: self.scratch_path + "/" + "dispatched", + JobStatus.PASSED: self.scratch_path + "/" + "passed", + JobStatus.FAILED: self.scratch_path + "/" + "failed", + JobStatus.KILLED: self.scratch_path + "/" + "killed", } # Use the default build mode for tests that do not specify it diff --git a/src/dvsim/sim_results.py b/src/dvsim/sim_results.py index c9a3c6ac..9c753482 100644 --- a/src/dvsim/sim_results.py +++ b/src/dvsim/sim_results.py @@ -10,6 +10,7 @@ from pydantic import BaseModel, ConfigDict +from dvsim.job.status import JobStatus from dvsim.testplan import Result if TYPE_CHECKING: @@ -124,7 +125,7 @@ def from_job_status(results: Sequence["CompletedJobStatus"]) -> "BucketedFailure buckets = {} for job_status in results: - if job_status.status in ["F", "K"]: + if job_status.status in (JobStatus.FAILED, JobStatus.KILLED): bucket = _bucketize(job_status.fail_msg.message) if bucket not in buckets: @@ -186,6 +187,6 @@ def _add_run(self, job_status: "CompletedJobStatus") -> None: row.job_runtime = job_status.job_runtime row.simulated_time = job_status.simulated_time - if job_status.status == "P": + if job_status.status == JobStatus.PASSED: row.passing += 1 row.total += 1 From 931f4b797925592a5c317ab5d78fecc13ebc558b Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 6 Feb 2026 12:32:12 +0000 Subject: [PATCH 6/6] refactor: Launcher poll always returns a status Previously these methods all had to account for the potential that they could be returning the uninitialized status, which was `None`. Default instead to `Queued` which means that the job is ready but not yet dispatched. In reality this `status` variable is a bit messy; it is defined on the base class as `None` and only set in `_post_finish` with the given status. It is only used by `_post_finish` itself (after being set) and is otherwise only directly used/returned by the `local`, `lsf` and `nc` launchers, each of which do not set the value. So: - It initializes as `None`. - The `local` and `nc` launcher only use it after invoking `post_finish`, so it is definitely a status then. - The `lsf` launcher only returns the status if it has a not-None status, i.e. if it already finished. So in all of these cases, `status` is only used after being set, it is just being initialized as `None` and the type propagates. Rather than perform typing casts or checks everywhere, lets just change this default value to be the `Queued` status. A larger refactor is likely needed in the future Signed-off-by: Alex Jones --- src/dvsim/launcher/base.py | 6 +++--- src/dvsim/launcher/fake.py | 2 +- src/dvsim/launcher/local.py | 2 +- src/dvsim/launcher/lsf.py | 6 +++--- src/dvsim/launcher/nc.py | 2 +- src/dvsim/launcher/sge/launcher.py | 2 +- src/dvsim/launcher/slurm.py | 2 +- src/dvsim/scheduler.py | 4 ---- 8 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index 2c15f0d8..f1cddb03 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -119,7 +119,7 @@ def __init__(self, job_spec: "JobSpec") -> None: # _check_status() method, but eventually updated by the _post_finish() # method, in case any of the cleanup tasks fails. This value is finally # returned to the Scheduler by the poll() method. - self.status = None + self.status = JobStatus.QUEUED # Return status of the process running the job. self.exit_code = None @@ -259,13 +259,13 @@ def launch(self) -> None: self._do_launch() @abstractmethod - def poll(self) -> JobStatus | None: + def poll(self) -> JobStatus: """Poll the launched job for completion. Invokes _check_status() and _post_finish() when the job completes. Returns: - status of the job or None + status of the job """ diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py index 7053c0db..d64fcb86 100644 --- a/src/dvsim/launcher/fake.py +++ b/src/dvsim/launcher/fake.py @@ -57,7 +57,7 @@ class FakeLauncher(Launcher): def _do_launch(self) -> None: """Do the launch.""" - def poll(self) -> JobStatus | None: + def poll(self) -> JobStatus: """Check status of the running process.""" deploy_cls = self.job_spec.job_type if deploy_cls in _DEPLOY_HANDLER: diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index 74c07eaa..8132844b 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -103,7 +103,7 @@ def _do_launch(self) -> None: self._link_odir(JobStatus.DISPATCHED) - def poll(self) -> JobStatus | None: + def poll(self) -> JobStatus: """Check status of the running process. This returns a job status. If DISPATCHED, the job is still running. diff --git a/src/dvsim/launcher/lsf.py b/src/dvsim/launcher/lsf.py index 167b847e..ec75ddc0 100644 --- a/src/dvsim/launcher/lsf.py +++ b/src/dvsim/launcher/lsf.py @@ -273,15 +273,15 @@ def _do_launch(self) -> None: job.job_id = f"{job_id}[{job.index}]" job._link_odir(JobStatus.DISPATCHED) - def poll(self) -> JobStatus | None: + def poll(self) -> JobStatus: """Poll the status of the job. Returns: - status of the job or None + status of the job """ # It is possible we may have determined the status already. - if self.status: + if self.status is not JobStatus.QUEUED: return self.status if not self.bsub_out_fd: diff --git a/src/dvsim/launcher/nc.py b/src/dvsim/launcher/nc.py index 6f1d7a8f..9c326032 100644 --- a/src/dvsim/launcher/nc.py +++ b/src/dvsim/launcher/nc.py @@ -168,7 +168,7 @@ def _do_launch(self) -> None: def minutes_since_start(self): return (datetime.datetime.now() - self.start_time).total_seconds() / 60 - def poll(self) -> JobStatus | None: + def poll(self) -> JobStatus: """Check status of the running process. This returns a job status. If DISPATCHED, the job is still running. diff --git a/src/dvsim/launcher/sge/launcher.py b/src/dvsim/launcher/sge/launcher.py index 8559788b..24411fd6 100644 --- a/src/dvsim/launcher/sge/launcher.py +++ b/src/dvsim/launcher/sge/launcher.py @@ -97,7 +97,7 @@ def _do_launch(self) -> None: self._link_odir(JobStatus.DISPATCHED) f.close() - def poll(self) -> JobStatus | None: + def poll(self) -> JobStatus: """Check status of the running process. This returns a job status. If DISPATCHED, the job is still running. diff --git a/src/dvsim/launcher/slurm.py b/src/dvsim/launcher/slurm.py index 7e966ae7..218d95c7 100644 --- a/src/dvsim/launcher/slurm.py +++ b/src/dvsim/launcher/slurm.py @@ -88,7 +88,7 @@ def _do_launch(self) -> None: self._link_odir(JobStatus.DISPATCHED) - def poll(self): + def poll(self) -> JobStatus: """Check status of the running process. This returns a job status. If DISPATCHED, the job is still running. diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 28de4851..ee1f99ed 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -484,10 +484,6 @@ def _poll(self, hms: str) -> bool: status = JobStatus.KILLED level = log.VERBOSE - if status is None: - msg = "Expected a valid job status but got None instead." - raise ValueError(msg) - if status == JobStatus.DISPATCHED: continue