Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## Version 0.6.2
## Version 0.7.0
- Added passive support for array jobs. In the output of `qq jobs` and `qq stat`, individual sub-jobs are displayed for all array jobs.
- Added autocomplete for script name in `qq submit` and `qq shebang`.
- Some rewordings.

***

## Version 0.6.2
- The operation for obtaining the list of working nodes at job start is now retried potentially decreasing the number of failures on unstable systems (like Metacentrum).

## Version 0.6.1
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ dev = [
"pytest-cov>=7.0.0",
"ruff>=0.13.0",
"snakeviz>=2.2.2",
"ty>=0.0.1a24",
"ty>=0.0.18",
]


Expand Down
15 changes: 13 additions & 2 deletions src/qq_lib/batch/interface/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


from abc import ABC, abstractmethod
from collections.abc import Sequence
from datetime import datetime, timedelta
from pathlib import Path
from typing import Self
Expand Down Expand Up @@ -331,15 +332,15 @@ def toYaml(self) -> str:
pass

@abstractmethod
def getSteps(self) -> list[Self]:
def getSteps(self) -> Sequence[Self]:
"""
Return a list of steps associated with this job.

Note that job step is represented by BatchJobInterface, but
may not contain all the values that a proper BatchJobInterface contains.

Returns:
list[BatchJobInterface] | None: List of job steps. An empty list if there are none.
Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.
"""
pass

Expand All @@ -352,3 +353,13 @@ def getStepId(self) -> str | None:
str | None: Job step index or `None` if this is not a job step.
"""
pass

@abstractmethod
def isArrayJob(self) -> bool:
"""
Return `True` if the job is a top-level array job (not a sub-job).

Returns:
bool: `True` if the job is a top-level array job, else `False`.
"""
pass
4 changes: 2 additions & 2 deletions src/qq_lib/batch/interface/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __str__(cls: type[BatchInterface]) -> str:
return cls.envName()

@classmethod
def register(cls, batch_cls: type[BatchInterface]) -> None:
def registerBatchSystem(cls, batch_cls: type[BatchInterface]) -> None:
"""
Register a batch system class in the metaclass registry.

Expand Down Expand Up @@ -132,5 +132,5 @@ def batch_system(cls):

Has to be added to every implementation of `BatchInterface`.
"""
BatchMeta.register(cls)
BatchMeta.registerBatchSystem(cls)
return cls
9 changes: 5 additions & 4 deletions src/qq_lib/batch/pbs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def parse_multi_pbs_dump_to_dictionaries(

Args:
text (str): The raw PBS dump containing information about one or more queues/jobs/nodes.
keyword (str): Keyword identifying the start of a metadata block.
keyword (str | None): Keyword identifying the start of a metadata block.
If `None`, the first line is treated as identifier.

Returns:
list[tuple[dict[str, str], str]]: A list of tuples, each containing:
Expand Down Expand Up @@ -68,14 +69,14 @@ def parse_multi_pbs_dump_to_dictionaries(

if not block:
# extract the identifier
if keyword:
m = pattern.match(line) # ty: ignore[possibly-missing-attribute]
if pattern:
m = pattern.match(line)
if not m:
raise QQError(
f"Invalid PBS dump format. Could not extract identifier from:\n{line}"
)
identifier = m.group(1).strip()
# if keyword is not specified, used the first line as the identifier
# if keyword is not specified, use the first line as the identifier
else:
identifier = line.strip()
block.append(line)
Expand Down
17 changes: 15 additions & 2 deletions src/qq_lib/batch/pbs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@

import re
import subprocess
from collections.abc import Sequence
from datetime import datetime, timedelta
from pathlib import Path
from typing import Self

import yaml

from qq_lib.batch.interface import BatchJobInterface
from qq_lib.batch.pbs.common import parse_pbs_dump_to_dictionary
from qq_lib.batch.pbs.common import (
parse_pbs_dump_to_dictionary,
)
from qq_lib.core.common import hhmmss_to_duration, load_yaml_dumper
from qq_lib.core.config import CFG
from qq_lib.core.error import QQError
Expand Down Expand Up @@ -71,6 +74,11 @@ def getState(self) -> BatchState:
if not (state := self._info.get("job_state")):
return BatchState.UNKNOWN

# X is used by PBS to indicate finished tasks in unfinished array jobs,
# but qq uses X to indicate failure
if state == "X":
state = "F"

# if the job is finished and the return code is not zero, return FAILED
if state == "F":
exit_code = self.getExitCode()
Expand Down Expand Up @@ -289,14 +297,19 @@ def toYaml(self) -> str:
to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper
)

def getSteps(self) -> list[Self]:
def getSteps(self) -> Sequence[Self]:
# not available for PBS
return []

def getStepId(self) -> str | None:
# no job steps for PBS
return None

def isArrayJob(self) -> bool:
return (
array := self._info.get("array")
) is not None and array.lower() == "true"

@classmethod
def fromDict(cls, job_id: str, info: dict[str, str]) -> Self:
"""
Expand Down
17 changes: 11 additions & 6 deletions src/qq_lib/batch/pbs/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,29 @@ def jobKillForce(cls, job_id: str) -> None:

@classmethod
def getBatchJob(cls, job_id: str) -> PBSJob:
return PBSJob(job_id) # ty: ignore[invalid-return-type]
return PBSJob(job_id)

@classmethod
def getUnfinishedBatchJobs(cls, user: str) -> list[PBSJob]:
command = f"qstat -fwu {user}"
command = f"qstat -fwtu {user}"
logger.debug(command)
return cls._getBatchJobsUsingCommand(command)

@classmethod
def getBatchJobs(cls, user: str) -> list[PBSJob]:
command = f"qstat -fwxu {user}"
command = f"qstat -fwxtu {user}"
logger.debug(command)
return cls._getBatchJobsUsingCommand(command)

@classmethod
def getAllUnfinishedBatchJobs(cls) -> list[PBSJob]:
command = "qstat -fw"
command = "qstat -fwt"
logger.debug(command)
return cls._getBatchJobsUsingCommand(command)

@classmethod
def getAllBatchJobs(cls) -> list[PBSJob]:
command = "qstat -fxw"
command = "qstat -fxwt"
logger.debug(command)
return cls._getBatchJobsUsingCommand(command)

Expand Down Expand Up @@ -857,6 +857,11 @@ def _getBatchJobsUsingCommand(cls, command: str) -> list[PBSJob]:
for data, job_id in parse_multi_pbs_dump_to_dictionaries(
result.stdout.strip(), "Job Id"
):
jobs.append(PBSJob.fromDict(job_id, data))
# ignore top-level array jobs
job = PBSJob.fromDict(job_id, data)
if job.isArrayJob():
continue

jobs.append(job)

return jobs
10 changes: 4 additions & 6 deletions src/qq_lib/batch/pbs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,11 @@ def _setJobNumbers(self) -> None:
If parsing fails or the field is missing, `_job_numbers` is set to an empty dictionary.
"""
if not (state_count := self._info.get("state_count")):
self._job_numbers = {}
self._job_numbers: dict[str, str] = {}
return

try:
self._job_numbers = {
k: int(v)
for k, v in (p.split(":") for p in state_count.split()) # ty: ignore[possibly-missing-attribute]
}
self._job_numbers = dict(p.split(":") for p in state_count.split())
except Exception as e:
logger.warning(f"Could not get job counts for queue '{self._name}': {e}.")
self._job_numbers = {}
self._job_numbers: dict[str, str] = {}
10 changes: 7 additions & 3 deletions src/qq_lib/batch/slurm/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import re
import subprocess
from collections.abc import Sequence
from datetime import datetime, timedelta
from pathlib import Path
from typing import Self
Expand Down Expand Up @@ -307,7 +308,7 @@ def toYaml(self) -> str:
self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper
)

def getSteps(self) -> list[Self]:
def getSteps(self) -> Sequence[Self]:
command = f"sacct -j {self._job_id} --parsable2 --format={SACCT_STEP_FIELDS}"
logger.debug(command)

Expand Down Expand Up @@ -343,6 +344,9 @@ def getStepId(self) -> str | None:
except ValueError:
return None

def isArrayJob(self) -> bool:
return False

@classmethod
def fromDict(cls, job_id: str, info: dict[str, str]) -> Self:
"""
Expand Down Expand Up @@ -416,7 +420,7 @@ def fromSacctString(cls, string: str) -> Self:
SlurmJob._assignIfAllocated(info, "AllocCPUs", "ReqCPUs", "NumCPUs")
SlurmJob._assignIfAllocated(info, "AllocNodes", "ReqNodes", "NumNodes")

return SlurmJob.fromDict(info["JobId"], info)
return cls.fromDict(info["JobId"], info)

@classmethod
def _stepFromSacctString(cls, string: str) -> Self:
Expand Down Expand Up @@ -448,7 +452,7 @@ def _stepFromSacctString(cls, string: str) -> Self:
# other words may contain useless additional information
info["JobState"] = info["JobState"].split()[0]

return SlurmJob.fromDict(info["JobId"], info)
return cls.fromDict(info["JobId"], info)

def getIdsForSorting(self) -> list[int]:
"""
Expand Down
18 changes: 9 additions & 9 deletions src/qq_lib/batch/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ def jobKillForce(cls, job_id: str) -> None:

@classmethod
def getBatchJob(cls, job_id: str) -> SlurmJob:
return SlurmJob(job_id) # ty: ignore[invalid-return-type]
return SlurmJob(job_id)

@classmethod
def getUnfinishedBatchJobs(cls, user: str) -> list[SlurmJob]:
# get running jobs from sacct (faster than using squeue and scontrol)
command = f"sacct -u {user} --state RUNNING --allocations --noheader --parsable2 --format={SACCT_FIELDS}"
command = f"sacct -u {user} --state RUNNING --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
logger.debug(command)

sacct_jobs = cls._getBatchJobsUsingSacctCommand(command)

# get pending jobs using squeue
command = f'squeue -u {user} -t PENDING -h -o "%i"'
command = f'squeue -u {user} --array -t PENDING -h -o "%i"'
logger.debug(command)

squeue_jobs = cls._getBatchJobsUsingSqueueCommand(command)
Expand All @@ -147,13 +147,13 @@ def getUnfinishedBatchJobs(cls, user: str) -> list[SlurmJob]:
@classmethod
def getBatchJobs(cls, user: str) -> list[SlurmJob]:
# get all jobs, except pending which are not available from sacct
command = f"sacct -u {user} --allocations --noheader --parsable2 --format={SACCT_FIELDS}"
command = f"sacct -u {user} --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
logger.debug(command)

sacct_jobs = cls._getBatchJobsUsingSacctCommand(command)

# get pending jobs using squeue
command = f'squeue -u {user} -t PENDING -h -o "%i"'
command = f'squeue -u {user} --array -t PENDING -h -o "%i"'
logger.debug(command)

squeue_jobs = cls._getBatchJobsUsingSqueueCommand(command)
Expand All @@ -165,13 +165,13 @@ def getBatchJobs(cls, user: str) -> list[SlurmJob]:
@classmethod
def getAllUnfinishedBatchJobs(cls) -> list[SlurmJob]:
# get running jobs using sacct (faster than using squeue and scontrol)
command = f"sacct --state RUNNING --allusers --allocations --noheader --parsable2 --format={SACCT_FIELDS}"
command = f"sacct --state RUNNING --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
logger.debug(command)

sacct_jobs = cls._getBatchJobsUsingSacctCommand(command)

# get pending jobs using squeue
command = 'squeue -t PENDING -h -o "%i"'
command = 'squeue --array -t PENDING -h -o "%i"'
logger.debug(command)

squeue_jobs = cls._getBatchJobsUsingSqueueCommand(command)
Expand All @@ -183,13 +183,13 @@ def getAllUnfinishedBatchJobs(cls) -> list[SlurmJob]:
@classmethod
def getAllBatchJobs(cls) -> list[SlurmJob]:
# get all jobs, except pending which are not available from sacct
command = f"sacct --allusers --allocations --noheader --parsable2 --format={SACCT_FIELDS}"
command = f"sacct --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
logger.debug(command)

sacct_jobs = cls._getBatchJobsUsingSacctCommand(command)

# get pending jobs using squeue
command = 'squeue -t PENDING -h -o "%i"'
command = 'squeue --array -t PENDING -h -o "%i"'
logger.debug(command)

squeue_jobs = cls._getBatchJobsUsingSqueueCommand(command)
Expand Down
2 changes: 1 addition & 1 deletion src/qq_lib/clear/clearer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def clear(self, force: bool = False) -> None:
)
if excluded:
logger.info(
f"{len(excluded)} qq files were not safe to clear. Rerun as '{CFG.binary_name} clear --force' to clear them forcibly."
f"{len(excluded)} qq files could not be safely cleared. Rerun as '{CFG.binary_name} clear --force' to clear them forcibly."
)

def _collectRunTimeFiles(self) -> set[Path]:
Expand Down
4 changes: 2 additions & 2 deletions src/qq_lib/core/click_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def write_heading(self, heading: str) -> None:

def write_usage(
self, prog_name: str, args: str | None, prefix: str | None = None
) -> None:
) -> None: # ty: ignore[invalid-method-override]
"""Override to make Usage: header bold"""
if prefix is None:
prefix = "Usage:"
Expand All @@ -50,7 +50,7 @@ def write_dl(
rows: Sequence[tuple[str, str | None]],
_col_max: int = 30,
_col_spacing: int = 2,
) -> None:
) -> None: # ty: ignore[invalid-method-override]
for term, definition in rows:
colored_term = click.style(term, fg=self.options_color, bold=True)
self.write(f" {colored_term}\n")
Expand Down
4 changes: 2 additions & 2 deletions src/qq_lib/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
def load_yaml_dumper() -> type[yaml.Dumper]:
"""Return the fastest available YAML dumper (CDumper if possible)."""
try:
from yaml import CDumper as Dumper # type: ignore[attr-defined]
from yaml import CDumper as Dumper

logger.debug("Loaded YAML CDumper.")
except ImportError:
Expand All @@ -45,7 +45,7 @@ def load_yaml_loader() -> type[yaml.SafeLoader]:
"""Return the fastest available safe YAML loader (CSafeLoader if possible)."""
try:
from yaml import (
CSafeLoader as SafeLoader, # ty: ignore[possibly-missing-import]
CSafeLoader as SafeLoader,
)

logger.debug("Loaded YAML CLoader.")
Expand Down
4 changes: 2 additions & 2 deletions src/qq_lib/core/repeater.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def onException(
- BaseException: The caught exception instance.
- Repeater: Reference to this `Repeater` instance.
"""
self._handlers[exc_type] = handler # ty: ignore[invalid-assignment]
self._handlers[exc_type] = handler

def run(self) -> None:
"""
Expand All @@ -87,4 +87,4 @@ def run(self) -> None:
except tuple(self._handlers.keys()) as e:
self.encountered_errors[i] = e
handler = self._handlers[type(e)]
handler(e, self) # ty: ignore[invalid-argument-type]
handler(e, self)
Loading
Loading