From 3403d3c941444b97c566602bee368c5184f2c875 Mon Sep 17 00:00:00 2001 From: Richard Alpe Date: Tue, 12 May 2026 10:28:44 +0200 Subject: [PATCH 1/4] Add paused-marker check to halt dequeue WorkQueue.process_next short-circuits when /paused exists. In-flight jobs finish normally; nothing new moves from todo/ to doing/. Signed-off-by: Richard Alpe --- ghmoon | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ghmoon b/ghmoon index 61a0df3..e7c2606 100755 --- a/ghmoon +++ b/ghmoon @@ -289,6 +289,7 @@ class WorkQueue: "doing": os.path.join(path, "doing"), "done": os.path.join(path, "done"), } + self.pause_file = os.path.join(path, "paused") for path in self.paths.values(): os.makedirs(path, exist_ok=True) @@ -421,6 +422,11 @@ class WorkQueue: return def process_next(self): + if os.path.exists(self.pause_file): + if os.listdir(self.paths["todo"]): + sys.stderr.write("Skipping todo, work is paused\n") + return False + job = self._dequeue() if not job: return False From 2f193d2e897c56e00b9482421f29c4e13b0c3738 Mon Sep 17 00:00:00 2001 From: Richard Alpe Date: Tue, 12 May 2026 10:29:38 +0200 Subject: [PATCH 2/4] Store job result json Mirrors GitHub-style commit-status semantics: one current result per job, overwritten on each run. This can be used locally to display test results without access to github. Signed-off-by: Richard Alpe --- ghmoon | 51 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/ghmoon b/ghmoon index e7c2606..5d9c4dd 100755 --- a/ghmoon +++ b/ghmoon @@ -227,7 +227,7 @@ class GHCommit: else: status("error", "Internal error", gist) - return state == "success" + return state, gist class GHRepo: @@ -260,7 +260,10 @@ class GHRepo: if artifact.sha != sha: continue - self.process({ "sha": sha, "artifact": artifact.data }, interactive) + return self.process({ "sha": sha, "artifact": artifact.data }, interactive) + + sys.stderr.write(f"No matching artifact for {sha} in {str(self)}\n") + return "error", None def process(self, job, interactive=True): sha = job["sha"] @@ -271,7 +274,7 @@ class GHRepo: if not c.exists(): sys.stderr.write(f"Ignoring non-existing commit {sha} from {str(self)}\n") - return False + return "error", None c.checkout() @@ -290,9 +293,10 @@ class WorkQueue: "done": os.path.join(path, "done"), } self.pause_file = os.path.join(path, "paused") + self.result_path = os.path.join(path, "result") - for path in self.paths.values(): - os.makedirs(path, exist_ok=True) + for p in list(self.paths.values()) + [self.result_path]: + os.makedirs(p, exist_ok=True) if manage: for job in os.listdir(self.paths["doing"]): @@ -300,6 +304,20 @@ class WorkQueue: os.rename(os.path.join(self.paths["doing"], job), os.path.join(self.paths["todo"], job)) + def write_result(self, name, state, gist=None, report_log=None, error=None): + result = { + "state": state, + "gist": gist, + "report_log": report_log, + "finished_at": time.time(), + "error": error, + } + path = os.path.join(self.result_path, f"{name}.json") + tmp = path + ".tmp" + with open(tmp, "w") as f: + f.write(json.dumps(result)) + os.replace(tmp, path) + def _job_paths(self, name): return { state: os.path.join(self.paths[state], f"{name}.json") @@ -397,10 +415,12 @@ class WorkQueue: if not ("repo" in job and "sha" in job): sys.stderr.write(f"Skipping {name}: Malformed process job\n") + self.write_result(name, "error", error="Malformed process job") os.rename(paths["todo"], paths["done"]) return if not job["repo"] in repos: sys.stderr.write(f"Skipping {name}: Unknown repo {job['repo']}\n") + self.write_result(name, "error", error=f"Unknown repo {job['repo']}") os.rename(paths["todo"], paths["done"]) return @@ -410,13 +430,24 @@ class WorkQueue: sys.stderr.write(f"Skipping {name}: Job disappeared\n") return + state, gist, error = "error", None, None try: sys.stderr.write(f"Processing {name}\n") - repos[job["repo"]].process(job, not self.publish) + state, gist = repos[job["repo"]].process(job, not self.publish) sys.stderr.write(f"Processed {name}\n") except Exception as e: sys.stderr.write(f"Aborting {name}: {str(e)}\n") traceback.print_exc() + error = str(e) + + repo_obj = repos.get(job["repo"]) + report_log = None + if repo_obj: + candidate = f"{repo_obj.path}/report-{job['sha']}.log" + if os.path.exists(candidate): + report_log = candidate + + self.write_result(name, state, gist=gist, report_log=report_log, error=error) os.rename(paths["doing"], paths["done"]) return @@ -521,4 +552,10 @@ if __name__ == "__main__": elif args.cmd == "enqueue": wq.enqueue() elif args.cmd == "process": - sys.exit(0 if repos[args.repo].process_standalone(args.sha, not args.publish) else 1) + repo_obj = repos[args.repo] + state, gist = repo_obj.process_standalone(args.sha, not args.publish) + name = f"{args.repo.replace('/', '-')}-{args.sha}" + candidate = f"{repo_obj.path}/report-{args.sha}.log" + report_log = candidate if os.path.exists(candidate) else None + wq.write_result(name, state, gist=gist, report_log=report_log) + sys.exit(0 if state == "success" else 1) From c36ccc50de37e7a5567772a86f5e112d525ba144 Mon Sep 17 00:00:00 2001 From: Richard Alpe Date: Tue, 12 May 2026 09:40:05 +0200 Subject: [PATCH 3/4] Record commit-subject heading in job and result JSON When enqueueing, fetch the first line of the commit message via `gh api repos//commits/` and store it as `heading` in the job. This is intended to make life easier for humans. Signed-off-by: Richard Alpe --- ghmoon | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ghmoon b/ghmoon index 5d9c4dd..04b39c5 100755 --- a/ghmoon +++ b/ghmoon @@ -304,13 +304,14 @@ class WorkQueue: os.rename(os.path.join(self.paths["doing"], job), os.path.join(self.paths["todo"], job)) - def write_result(self, name, state, gist=None, report_log=None, error=None): + def write_result(self, name, state, gist=None, report_log=None, error=None, heading=None): result = { "state": state, "gist": gist, "report_log": report_log, "finished_at": time.time(), "error": error, + "heading": heading, } path = os.path.join(self.result_path, f"{name}.json") tmp = path + ".tmp" @@ -367,6 +368,7 @@ class WorkQueue: "type": "process", "repo": str(repo), "sha": artifact.sha, + "heading": repo.api(f"commits/{artifact.sha}")["commit"]["message"].splitlines()[0], "artifact": artifact.data, }) @@ -447,7 +449,8 @@ class WorkQueue: if os.path.exists(candidate): report_log = candidate - self.write_result(name, state, gist=gist, report_log=report_log, error=error) + self.write_result(name, state, gist=gist, report_log=report_log, error=error, + heading=job.get("heading")) os.rename(paths["doing"], paths["done"]) return @@ -555,7 +558,8 @@ if __name__ == "__main__": repo_obj = repos[args.repo] state, gist = repo_obj.process_standalone(args.sha, not args.publish) name = f"{args.repo.replace('/', '-')}-{args.sha}" + heading = repo_obj.api(f"commits/{args.sha}")["commit"]["message"].splitlines()[0] candidate = f"{repo_obj.path}/report-{args.sha}.log" report_log = candidate if os.path.exists(candidate) else None - wq.write_result(name, state, gist=gist, report_log=report_log) + wq.write_result(name, state, gist=gist, report_log=report_log, heading=heading) sys.exit(0 if state == "success" else 1) From cf5d05cbaf13ceda4d2925a147a1ea5acdeafe63 Mon Sep 17 00:00:00 2001 From: Richard Alpe Date: Tue, 19 May 2026 13:17:56 +0200 Subject: [PATCH 4/4] Add the ability to force run jobs while paused Start by locking at files in todo/forced, these files has priority over todo/*.json and are started even if a pause file exists. Nothing in ghmoon currently moves a todo entry to todo/forced. This is handled by the user or external tools. Signed-off-by: Richard Alpe --- ghmoon | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/ghmoon b/ghmoon index 04b39c5..87e711a 100755 --- a/ghmoon +++ b/ghmoon @@ -289,6 +289,7 @@ class WorkQueue: self.publish = publish self.paths = { "todo": os.path.join(path, "todo"), + "forced": os.path.join(path, "todo", "forced"), "doing": os.path.join(path, "doing"), "done": os.path.join(path, "done"), } @@ -349,6 +350,11 @@ class WorkQueue: os.remove(paths["todo"]) sys.stderr.write(f"Ignoring {name}: already processed\n") return False + elif os.path.exists(paths["forced"]): + os.close(fd) + os.remove(paths["todo"]) + sys.stderr.write(f"Ignoring {name}: already forced\n") + return False with os.fdopen(fd, "w") as f: f.write(json.dumps(data)) @@ -383,11 +389,11 @@ class WorkQueue: out = subprocess.check_output(["ls", "-rt", "--time=birth", path], text=True) return out.splitlines() - def _dequeue(self): + def _dequeue(self, lane): assert not os.listdir(self.paths["doing"]), "Another job is alredy in progress" - abspaths = [os.path.join(self.paths["todo"], job) - for job in self.listdir(self.paths["todo"])] + entries = [e for e in self.listdir(self.paths[lane]) if e.endswith(".json")] + abspaths = [os.path.join(self.paths[lane], e) for e in entries] for abspath in abspaths: try: with open(abspath) as f: @@ -411,23 +417,24 @@ class WorkQueue: return None - def _process_job(self, job): + def _process_job(self, job, src_lane): name = job["name"] paths = self._job_paths(name) + src = paths[src_lane] if not ("repo" in job and "sha" in job): sys.stderr.write(f"Skipping {name}: Malformed process job\n") self.write_result(name, "error", error="Malformed process job") - os.rename(paths["todo"], paths["done"]) + os.rename(src, paths["done"]) return if not job["repo"] in repos: sys.stderr.write(f"Skipping {name}: Unknown repo {job['repo']}\n") self.write_result(name, "error", error=f"Unknown repo {job['repo']}") - os.rename(paths["todo"], paths["done"]) + os.rename(src, paths["done"]) return try: - os.rename(paths["todo"], paths["doing"]) + os.rename(src, paths["doing"]) except: sys.stderr.write(f"Skipping {name}: Job disappeared\n") return @@ -456,17 +463,20 @@ class WorkQueue: return def process_next(self): - if os.path.exists(self.pause_file): - if os.listdir(self.paths["todo"]): - sys.stderr.write("Skipping todo, work is paused\n") - return False + paused = os.path.exists(self.pause_file) - job = self._dequeue() + src_lane = "forced" + job = self._dequeue(src_lane) + if not job and not paused: + src_lane = "todo" + job = self._dequeue(src_lane) if not job: + if paused and any(e.endswith(".json") for e in os.listdir(self.paths["todo"])): + sys.stderr.write("Skipping todo, work is paused\n") return False if job["type"] == "process": - self._process_job(job) + self._process_job(job, src_lane) return True