-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmulti-kill
More file actions
executable file
·170 lines (134 loc) · 4.88 KB
/
multi-kill
File metadata and controls
executable file
·170 lines (134 loc) · 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#!/usr/bin/env -S uv run --script
# Released under MIT License.
# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
"""
Kill qq jobs in multiple directories.
Version 0.1.
Requires `uv`: https://docs.astral.sh/uv
"""
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "qq",
# ]
#
# [tool.uv.sources]
# qq = { git = "https://github.com/Ladme/qq.git", tag = "v0.6.2" }
# ///
import argparse
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from rich.console import Console
from rich.progress import BarColumn, Progress, TextColumn, TimeRemainingColumn
from qq_lib.core.common import get_info_files
from qq_lib.core.error import QQError
from qq_lib.info import Informer
from qq_lib.kill import Killer
console = Console()
# increase logging level to critical
logging.disable(logging.ERROR)
def get_killer(directory: str) -> Killer | None:
"""Get killer for the newest job in the specified directory."""
# check that the directory is actually a directory and convert it to Path
if not (directory := Path(directory)).is_dir():
return None
# get all info files in the directory
info_files = get_info_files(directory)
# if no qq info files are found, return None
if not info_files:
return None
# get the last info file (the newest one) and load it into an informer
informer = Informer.fromFile(info_files[-1], None)
# construct a killer from informer
if informer:
return Killer.fromInformer(informer)
return None
def process_directory(directory: str) -> tuple[str, bool]:
"""Return (directory, success) for use in thread pool."""
# get killer from the directory
killer = get_killer(directory)
# return (directory, false), if killer not constructed
if not killer:
return (directory, False)
# make sure that the job is suitable to be killed
try:
killer.ensureSuitable()
except Exception:
return (directory, False)
# kill the job
try:
killer.kill()
except QQError as e:
console.print(
f"\n[red bold]ERROR[/red bold]. Could not kill the job in directory '{directory}': {e}"
)
return (directory, False)
return (directory, True)
def kill_directories(
directories: list[Path], threads: int
) -> tuple[set[Path], set[Path]]:
"""Kill jobs in directories that are suitable to be killed."""
# prepare a progress bar
progress = Progress(
TextColumn("Killing jobs"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=console,
expand=False,
)
killed_jobs = set()
not_killed_jobs = set()
with progress:
# register a new progress task (for progress bar)
task = progress.add_task("kill", total=len(directories))
# create a thread pool for parallel processing of jobs
with ThreadPoolExecutor(max_workers=threads) as executor:
# submit each job to the thread pool
# `futures` maps Future objects to directories
futures = {executor.submit(process_directory, d): d for d in directories}
# iterate over futures as they finish (in arbitrary order)
for future in as_completed(futures):
result = future.result()
# if successful, add the directory to the list of killed jobs
if result[1]:
killed_jobs.add(result[0])
else:
not_killed_jobs.add(result[0])
# advance the progress bar by once since one job is killed
progress.update(task, advance=1)
return killed_jobs, not_killed_jobs
def main():
# parse command line options
parser = argparse.ArgumentParser(
"multi-kill",
description="Kill qq jobs in multiple directories.",
)
parser.add_argument(
"directories", nargs="+", help="Directories containing qq info files."
)
parser.add_argument(
"-t",
"--threads",
type=int,
default=16,
help="Number of worker threads (default: 16)",
)
args = parser.parse_args()
console.print()
# kill the jobs
killed, not_killed = kill_directories(args.directories, args.threads)
if not killed and not not_killed:
console.print("[bold]Nothing to kill.[/bold]\n")
return
console.print(
f"\n[bright_green bold]{'KILLED SUCCESSFULLY':25s}[/bright_green bold] [default]{len(killed)}[/default]"
)
console.print(f"[grey70]{' '.join(str(x) for x in sorted(killed))}[/grey70]")
console.print(
f"\n[bright_red bold]{'COULD NOT KILL':25s}[/bright_red bold] [default]{len(not_killed)}[/default]"
)
console.print(f"[grey70]{' '.join(str(x) for x in sorted(not_killed))}[/grey70]\n")
if __name__ == "__main__":
main()