diff --git a/packages/engine/src/services/chunkEncoder.ts b/packages/engine/src/services/chunkEncoder.ts index b7ccae769..a3fff5694 100644 --- a/packages/engine/src/services/chunkEncoder.ts +++ b/packages/engine/src/services/chunkEncoder.ts @@ -8,7 +8,7 @@ import { spawn } from "child_process"; import { copyFileSync, existsSync, mkdirSync, readdirSync, statSync, writeFileSync } from "fs"; import { join, dirname } from "path"; -import { trackChildProcess } from "../utils/processTracker.js"; +import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js"; import { DEFAULT_CONFIG, type EngineConfig } from "../config.js"; import { type GpuEncoder, @@ -414,12 +414,14 @@ export async function encodeFramesFromDir( const ffmpeg = spawn("ffmpeg", args); trackChildProcess(ffmpeg); let stderr = ""; + let timedOut = false; + const cancelEscalations: Array<() => void> = []; const onAbort = () => { - ffmpeg.kill("SIGTERM"); + cancelEscalations.push(killWithEscalation(ffmpeg)); }; if (signal) { if (signal.aborted) { - ffmpeg.kill("SIGTERM"); + onAbort(); } else { signal.addEventListener("abort", onAbort, { once: true }); } @@ -427,16 +429,22 @@ export async function encodeFramesFromDir( const encodeTimeout = config?.ffmpegEncodeTimeout ?? DEFAULT_CONFIG.ffmpegEncodeTimeout; const timer = setTimeout(() => { - ffmpeg.kill("SIGTERM"); + timedOut = true; + cancelEscalations.push(killWithEscalation(ffmpeg)); }, encodeTimeout); ffmpeg.stderr.on("data", (data) => { stderr += data.toString(); }); - ffmpeg.on("close", (code) => { + const cleanup = () => { clearTimeout(timer); + for (const cancel of cancelEscalations) cancel(); if (signal) signal.removeEventListener("abort", onAbort); + }; + + ffmpeg.on("close", (code) => { + cleanup(); const durationMs = Date.now() - startTime; if (signal?.aborted) { resolve({ @@ -457,7 +465,11 @@ export async function encodeFramesFromDir( durationMs, framesEncoded: 0, fileSize: 0, - error: formatFfmpegError(code, stderr), + // A timeout kill exits with a signal (code null); name the actual + // cause instead of the unhelpful "[FFmpeg] process error". + error: timedOut + ? `[FFmpeg] encode timed out after ${encodeTimeout}ms` + : formatFfmpegError(code, stderr), }); return; } @@ -467,8 +479,7 @@ export async function encodeFramesFromDir( }); ffmpeg.on("error", (err) => { - clearTimeout(timer); - if (signal) signal.removeEventListener("abort", onAbort); + cleanup(); resolve({ success: false, outputPath, diff --git a/packages/engine/src/services/streamingEncoder.ts b/packages/engine/src/services/streamingEncoder.ts index 08f6cbcbb..3743ae6d5 100644 --- a/packages/engine/src/services/streamingEncoder.ts +++ b/packages/engine/src/services/streamingEncoder.ts @@ -13,7 +13,7 @@ */ import { spawn, type ChildProcess } from "child_process"; -import { trackChildProcess } from "../utils/processTracker.js"; +import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js"; import { existsSync, mkdirSync, statSync } from "fs"; import { dirname } from "path"; @@ -390,6 +390,7 @@ export async function spawnStreamingEncoder( let exitCode: number | null = null; let exitPromiseResolve: ((value: void) => void) | null = null; const exitPromise = new Promise((resolve) => (exitPromiseResolve = resolve)); + const cancelEscalations: Array<() => void> = []; // Track stderr for progress and error messages ffmpeg.stderr?.on("data", (data: Buffer) => { @@ -397,12 +398,14 @@ export async function spawnStreamingEncoder( }); ffmpeg.on("close", (code: number | null) => { + for (const cancel of cancelEscalations) cancel(); exitCode = code; exitStatus = code === 0 ? "success" : "error"; exitPromiseResolve?.(); }); ffmpeg.on("error", (err: Error) => { + for (const cancel of cancelEscalations) cancel(); exitStatus = "error"; stderr += `\nProcess error: ${err.message}`; exitPromiseResolve?.(); @@ -414,12 +417,12 @@ export async function spawnStreamingEncoder( // Handle abort signal const onAbort = () => { if (exitStatus === "running") { - ffmpeg.kill("SIGTERM"); + cancelEscalations.push(killWithEscalation(ffmpeg)); } }; if (signal) { if (signal.aborted) { - ffmpeg.kill("SIGTERM"); + onAbort(); } else { signal.addEventListener("abort", onAbort, { once: true }); } @@ -440,7 +443,7 @@ export async function spawnStreamingEncoder( if (timer) clearTimeout(timer); timer = setTimeout(() => { if (exitStatus === "running") { - ffmpeg.kill("SIGTERM"); + cancelEscalations.push(killWithEscalation(ffmpeg)); } }, streamingTimeout); }; diff --git a/packages/engine/src/services/videoFrameExtractor.ts b/packages/engine/src/services/videoFrameExtractor.ts index 096dd6b5c..cc2eafd6c 100644 --- a/packages/engine/src/services/videoFrameExtractor.ts +++ b/packages/engine/src/services/videoFrameExtractor.ts @@ -10,7 +10,7 @@ import { existsSync, mkdirSync, readdirSync, rmSync } from "fs"; import { isAbsolute, join, posix, resolve, sep } from "path"; import { parseHTML } from "linkedom"; import { decodeUrlPathVariants } from "@hyperframes/core"; -import { trackChildProcess } from "../utils/processTracker.js"; +import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js"; import { extractMediaMetadata, type VideoMetadata } from "../utils/ffprobe.js"; import { analyzeCompositionHdr, @@ -262,34 +262,50 @@ export async function extractVideoFramesRange( const ffmpeg = spawn("ffmpeg", args); trackChildProcess(ffmpeg); let stderr = ""; + let timedOut = false; + const cancelEscalations: Array<() => void> = []; const onAbort = () => { - ffmpeg.kill("SIGTERM"); + cancelEscalations.push(killWithEscalation(ffmpeg)); }; if (signal) { if (signal.aborted) { - ffmpeg.kill("SIGTERM"); + onAbort(); } else { signal.addEventListener("abort", onAbort, { once: true }); } } const timer = setTimeout(() => { - ffmpeg.kill("SIGTERM"); + timedOut = true; + cancelEscalations.push(killWithEscalation(ffmpeg)); }, ffmpegProcessTimeout); ffmpeg.stderr.on("data", (data) => { stderr += data.toString(); }); - ffmpeg.on("close", (code) => { + const cleanup = () => { clearTimeout(timer); + for (const cancel of cancelEscalations) cancel(); if (signal) signal.removeEventListener("abort", onAbort); + }; + + ffmpeg.on("close", (code) => { + cleanup(); if (signal?.aborted) { reject(new Error("Video frame extraction cancelled")); return; } if (code !== 0) { - reject(new Error(`FFmpeg exited with code ${code}: ${stderr.slice(-500)}`)); + // A timeout kill exits with a signal (code null); name the actual + // cause instead of the unhelpful "exited with code null". + reject( + new Error( + timedOut + ? `FFmpeg frame extraction timed out after ${ffmpegProcessTimeout}ms: ${stderr.slice(-500)}` + : `FFmpeg exited with code ${code}: ${stderr.slice(-500)}`, + ), + ); return; } @@ -314,8 +330,7 @@ export async function extractVideoFramesRange( }); ffmpeg.on("error", (err) => { - clearTimeout(timer); - if (signal) signal.removeEventListener("abort", onAbort); + cleanup(); if ((err as NodeJS.ErrnoException).code === "ENOENT") { reject(new Error("[FFmpeg] ffmpeg not found")); } else { diff --git a/packages/engine/src/utils/gpuEncoder.ts b/packages/engine/src/utils/gpuEncoder.ts index fe1f58043..1570be40e 100644 --- a/packages/engine/src/utils/gpuEncoder.ts +++ b/packages/engine/src/utils/gpuEncoder.ts @@ -6,6 +6,7 @@ */ import { spawn } from "child_process"; +import { killWithEscalation } from "./processTracker.js"; export type ConcreteGpuEncoder = "nvenc" | "videotoolbox" | "vaapi" | "qsv" | "amf"; export type GpuEncoder = ConcreteGpuEncoder | null; @@ -138,13 +139,13 @@ async function canUseGpuEncoder(encoder: ConcreteGpuEncoder): Promise { return new Promise((resolve) => { let settled = false; let timedOut = false; - let killTimer: ReturnType | undefined; + let cancelEscalation: (() => void) | null = null; let stderr = ""; const finish = (usable: boolean) => { if (settled) return; settled = true; clearTimeout(timer); - if (killTimer) clearTimeout(killTimer); + cancelEscalation?.(); resolve(usable); }; const ffmpeg = spawn("ffmpeg", getProbeArgs(encoder), { @@ -157,11 +158,7 @@ async function canUseGpuEncoder(encoder: ConcreteGpuEncoder): Promise { const timer = setTimeout(() => { timedOut = true; - ffmpeg.kill("SIGTERM"); - killTimer = setTimeout(() => { - ffmpeg.kill("SIGKILL"); - finish(false); - }, GPU_PROBE_KILL_GRACE_MS); + cancelEscalation = killWithEscalation(ffmpeg, GPU_PROBE_KILL_GRACE_MS); }, GPU_PROBE_TIMEOUT_MS); ffmpeg.on("close", (code, signal) => { diff --git a/packages/engine/src/utils/processTracker.test.ts b/packages/engine/src/utils/processTracker.test.ts index 0e8d72232..707ebd1c0 100644 --- a/packages/engine/src/utils/processTracker.test.ts +++ b/packages/engine/src/utils/processTracker.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, beforeEach } from "vitest"; import { spawn } from "node:child_process"; -import { trackChildProcess, killTrackedProcesses } from "./processTracker.js"; +import { trackChildProcess, killTrackedProcesses, killWithEscalation } from "./processTracker.js"; // Reset tracked set between tests by killing everything beforeEach(() => { @@ -72,3 +72,66 @@ describe("killTrackedProcesses", () => { killTrackedProcesses(); }); }); + +// On Windows, kill("SIGTERM") maps to TerminateProcess and is unconditional, +// so a trap-based shim can't ignore it and the SIGKILL escalation is never +// reached. The whole block exercises POSIX-only signal semantics; skip it +// there, same as the runFfmpeg kill-escalation suite. +describe.skipIf(process.platform === "win32")("killWithEscalation", () => { + it("kills a SIGTERM-compliant process", async () => { + const proc = spawn("sleep", ["60"], { stdio: "ignore" }); + + const exitPromise = new Promise((resolve) => proc.on("close", () => resolve())); + const cancel = killWithEscalation(proc); + + await exitPromise; + cancel(); + expect(proc.signalCode).toBe("SIGTERM"); + }); + + it("escalates to SIGKILL when the process ignores SIGTERM", async () => { + // Block on the bash builtin `read` (stdin held open by the pipe) instead + // of spawning `sleep`: a SIGKILLed bash reparents the sleep child, which + // then lingers for 60s and accumulates orphans in watch/parallel runs. + const proc = spawn("bash", ["-c", "trap '' TERM; read -t 60 _"], { + stdio: ["pipe", "ignore", "ignore"], + }); + // Give bash a beat to install the trap; killing before that races the + // trap setup and SIGTERM would win legitimately. + await new Promise((resolve) => setTimeout(resolve, 200)); + + const exitPromise = new Promise((resolve) => proc.on("close", () => resolve())); + const cancel = killWithEscalation(proc, 100); + + await exitPromise; + cancel(); + expect(proc.signalCode).toBe("SIGKILL"); + }, 5000); + + it("cancel clears the pending escalation", async () => { + const proc = spawn("bash", ["-c", "trap '' TERM; read -t 60 _"], { + stdio: ["pipe", "ignore", "ignore"], + }); + await new Promise((resolve) => setTimeout(resolve, 200)); + + const cancel = killWithEscalation(proc, 100); + cancel(); + + // Past the grace period the process must still be alive: SIGTERM was + // trapped and the SIGKILL escalation was cancelled. + await new Promise((resolve) => setTimeout(resolve, 300)); + expect(proc.exitCode).toBeNull(); + expect(proc.signalCode).toBeNull(); + + proc.kill("SIGKILL"); + await new Promise((resolve) => proc.on("close", () => resolve())); + }, 5000); + + it("does not throw on an already-exited process", async () => { + const proc = spawn("true", { stdio: "ignore" }); + await new Promise((resolve) => proc.on("close", () => resolve())); + + const cancel = killWithEscalation(proc); + cancel(); + }); +}); diff --git a/packages/engine/src/utils/processTracker.ts b/packages/engine/src/utils/processTracker.ts index c75c9ce84..8139ac01c 100644 --- a/packages/engine/src/utils/processTracker.ts +++ b/packages/engine/src/utils/processTracker.ts @@ -9,6 +9,41 @@ export function trackChildProcess(proc: ChildProcess): void { proc.once("error", remove); } +const KILL_ESCALATION_GRACE_MS = 500; + +/** + * Kill a single child process with SIGTERM, escalating to SIGKILL if it has + * not exited after a short grace period. Same policy as + * killTrackedProcesses(), but for timeout/abort kills of one process whose + * caller is awaiting its `close` event — without the escalation, a process + * that ignores SIGTERM (stuck I/O, frozen pipe) never emits `close` and the + * awaiting promise hangs forever. + * + * Returns a cancel function; call it once the process exits so the + * escalation timer doesn't outlive it. + */ +export function killWithEscalation( + proc: ChildProcess, + graceMs: number = KILL_ESCALATION_GRACE_MS, +): () => void { + try { + proc.kill("SIGTERM"); + } catch { + // Already exited. + } + const timer = setTimeout(() => { + if (proc.exitCode === null && proc.signalCode === null) { + try { + proc.kill("SIGKILL"); + } catch { + // Already exited. + } + } + }, graceMs); + timer.unref(); + return () => clearTimeout(timer); +} + /** * SIGTERM all tracked child processes, then SIGKILL any that survive * after a short grace period. diff --git a/packages/engine/src/utils/runFfmpeg.test.ts b/packages/engine/src/utils/runFfmpeg.test.ts index 5c3c464e2..022616f84 100644 --- a/packages/engine/src/utils/runFfmpeg.test.ts +++ b/packages/engine/src/utils/runFfmpeg.test.ts @@ -1,6 +1,9 @@ -import { describe, expect, it } from "vitest"; +import { afterAll, beforeAll, describe, expect, it } from "vitest"; +import { chmodSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; -import { formatFfmpegError } from "./runFfmpeg.js"; +import { formatFfmpegError, runFfmpeg } from "./runFfmpeg.js"; describe("formatFfmpegError", () => { it("reports exit code alone when stderr is empty", () => { @@ -44,3 +47,70 @@ describe("formatFfmpegError", () => { expect(formatFfmpegError(null, "spawn ffmpeg ENOENT")).toBe("[FFmpeg] spawn ffmpeg ENOENT"); }); }); + +// Shadows the real ffmpeg via PATH with a script that traps SIGTERM. Before +// the SIGKILL escalation, both kill paths sent SIGTERM only, so a stuck +// ffmpeg never emitted `close` and these awaits hung forever (the test would +// fail by timeout). Shell-script shim, so skipped on Windows. +// +// The shim is careful about two things: +// - It prints "ready" to stderr after installing the trap, and the tests +// only treat a kill as trap-protected once that line is seen. Killing +// earlier races the trap setup and SIGTERM would win legitimately. +// - It blocks on the builtin `read` instead of spawning `sleep`, so no +// child process inherits the stdio pipes. A SIGKILLed shim closes its +// pipes immediately and `close` fires right away; an inherited pipe +// would defer `close` until the child exits. +describe.skipIf(process.platform === "win32")("runFfmpeg kill escalation", () => { + let fakeBinDir: string; + let originalPath: string | undefined; + + beforeAll(async () => { + fakeBinDir = mkdtempSync(join(tmpdir(), "hf-fake-ffmpeg-")); + const script = join(fakeBinDir, "ffmpeg"); + writeFileSync(script, '#!/usr/bin/env bash\ntrap "" TERM\necho ready >&2\nread -t 60 _\n'); + chmodSync(script, 0o755); + originalPath = process.env.PATH; + process.env.PATH = `${fakeBinDir}:${process.env.PATH ?? ""}`; + + // Warm-up run: the first exec of a fresh script file can be slow + // (macOS scans it on first run). Waiting for "ready" once here keeps + // the timed tests below from racing that one-time latency. + const controller = new AbortController(); + await runFfmpeg([], { + signal: controller.signal, + onStderr: (line) => { + if (line.includes("ready")) controller.abort(); + }, + }); + }, 15_000); + + afterAll(() => { + process.env.PATH = originalPath; + rmSync(fakeBinDir, { recursive: true, force: true }); + }); + + it("resolves instead of hanging when a timed-out ffmpeg ignores SIGTERM", async () => { + const result = await runFfmpeg([], { timeout: 500 }); + expect(result.success).toBe(false); + expect(result.exitCode).toBeNull(); + // Resolution must have come through the SIGKILL escalation: timeout + // (500ms) plus the escalation grace period, not a SIGTERM exit at + // the timeout mark. + expect(result.durationMs).toBeGreaterThanOrEqual(900); + }, 5000); + + it("resolves instead of hanging when an aborted ffmpeg ignores SIGTERM", async () => { + const controller = new AbortController(); + const result = await runFfmpeg([], { + signal: controller.signal, + onStderr: (line) => { + // Abort only after the trap is installed so SIGTERM is guaranteed + // to be ignored and the SIGKILL escalation is what unblocks us. + if (line.includes("ready")) controller.abort(); + }, + }); + expect(result.success).toBe(false); + expect(result.durationMs).toBeGreaterThanOrEqual(450); + }, 5000); +}); diff --git a/packages/engine/src/utils/runFfmpeg.ts b/packages/engine/src/utils/runFfmpeg.ts index c5ccbdf66..dc2761aaa 100644 --- a/packages/engine/src/utils/runFfmpeg.ts +++ b/packages/engine/src/utils/runFfmpeg.ts @@ -6,7 +6,7 @@ */ import { spawn } from "child_process"; -import { trackChildProcess } from "./processTracker.js"; +import { killWithEscalation, trackChildProcess } from "./processTracker.js"; export interface RunFfmpegOptions { signal?: AbortSignal; @@ -63,21 +63,22 @@ export async function runFfmpeg(args: string[], opts?: RunFfmpegOptions): Promis const ffmpeg = spawn("ffmpeg", args); trackChildProcess(ffmpeg); let stderr = ""; + const cancelEscalations: Array<() => void> = []; const onAbort = () => { - ffmpeg.kill("SIGTERM"); + cancelEscalations.push(killWithEscalation(ffmpeg)); }; if (signal) { if (signal.aborted) { - ffmpeg.kill("SIGTERM"); + onAbort(); } else { signal.addEventListener("abort", onAbort, { once: true }); } } const timer = setTimeout(() => { - ffmpeg.kill("SIGTERM"); + cancelEscalations.push(killWithEscalation(ffmpeg)); }, timeout); ffmpeg.stderr.on("data", (data: Buffer) => { @@ -88,9 +89,14 @@ export async function runFfmpeg(args: string[], opts?: RunFfmpegOptions): Promis } }); - ffmpeg.on("close", (code) => { + const cleanup = () => { clearTimeout(timer); + for (const cancel of cancelEscalations) cancel(); if (signal) signal.removeEventListener("abort", onAbort); + }; + + ffmpeg.on("close", (code) => { + cleanup(); resolve({ success: !signal?.aborted && code === 0, exitCode: code, @@ -100,8 +106,7 @@ export async function runFfmpeg(args: string[], opts?: RunFfmpegOptions): Promis }); ffmpeg.on("error", (err) => { - clearTimeout(timer); - if (signal) signal.removeEventListener("abort", onAbort); + cleanup(); resolve({ success: false, exitCode: null,