Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions packages/engine/src/services/chunkEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import { spawn } from "child_process";
import { copyFileSync, existsSync, mkdirSync, readdirSync, statSync, writeFileSync } from "fs";
import { join, dirname } from "path";
import { trackChildProcess } from "../utils/processTracker.js";
import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js";
import { DEFAULT_CONFIG, type EngineConfig } from "../config.js";
import {
type GpuEncoder,
Expand Down Expand Up @@ -414,29 +414,37 @@ export async function encodeFramesFromDir(
const ffmpeg = spawn("ffmpeg", args);
trackChildProcess(ffmpeg);
let stderr = "";
let timedOut = false;
const cancelEscalations: Array<() => void> = [];
const onAbort = () => {
ffmpeg.kill("SIGTERM");
cancelEscalations.push(killWithEscalation(ffmpeg));
};
if (signal) {
if (signal.aborted) {
ffmpeg.kill("SIGTERM");
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
}

const encodeTimeout = config?.ffmpegEncodeTimeout ?? DEFAULT_CONFIG.ffmpegEncodeTimeout;
const timer = setTimeout(() => {
ffmpeg.kill("SIGTERM");
timedOut = true;
cancelEscalations.push(killWithEscalation(ffmpeg));
}, encodeTimeout);

ffmpeg.stderr.on("data", (data) => {
stderr += data.toString();
});

ffmpeg.on("close", (code) => {
const cleanup = () => {
clearTimeout(timer);
for (const cancel of cancelEscalations) cancel();
if (signal) signal.removeEventListener("abort", onAbort);
};

ffmpeg.on("close", (code) => {
cleanup();
const durationMs = Date.now() - startTime;
if (signal?.aborted) {
resolve({
Expand All @@ -457,7 +465,11 @@ export async function encodeFramesFromDir(
durationMs,
framesEncoded: 0,
fileSize: 0,
error: formatFfmpegError(code, stderr),
// A timeout kill exits with a signal (code null); name the actual
// cause instead of the unhelpful "[FFmpeg] process error".
error: timedOut
? `[FFmpeg] encode timed out after ${encodeTimeout}ms`
: formatFfmpegError(code, stderr),
});
return;
}
Expand All @@ -467,8 +479,7 @@ export async function encodeFramesFromDir(
});

ffmpeg.on("error", (err) => {
clearTimeout(timer);
if (signal) signal.removeEventListener("abort", onAbort);
cleanup();
resolve({
success: false,
outputPath,
Expand Down
11 changes: 7 additions & 4 deletions packages/engine/src/services/streamingEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/

import { spawn, type ChildProcess } from "child_process";
import { trackChildProcess } from "../utils/processTracker.js";
import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js";
import { existsSync, mkdirSync, statSync } from "fs";
import { dirname } from "path";

Expand Down Expand Up @@ -390,19 +390,22 @@ export async function spawnStreamingEncoder(
let exitCode: number | null = null;
let exitPromiseResolve: ((value: void) => void) | null = null;
const exitPromise = new Promise<void>((resolve) => (exitPromiseResolve = resolve));
const cancelEscalations: Array<() => void> = [];

// Track stderr for progress and error messages
ffmpeg.stderr?.on("data", (data: Buffer) => {
stderr += data.toString();
});

ffmpeg.on("close", (code: number | null) => {
for (const cancel of cancelEscalations) cancel();
exitCode = code;
exitStatus = code === 0 ? "success" : "error";
exitPromiseResolve?.();
});

ffmpeg.on("error", (err: Error) => {
for (const cancel of cancelEscalations) cancel();
exitStatus = "error";
stderr += `\nProcess error: ${err.message}`;
exitPromiseResolve?.();
Expand All @@ -414,12 +417,12 @@ export async function spawnStreamingEncoder(
// Handle abort signal
const onAbort = () => {
if (exitStatus === "running") {
ffmpeg.kill("SIGTERM");
cancelEscalations.push(killWithEscalation(ffmpeg));
}
};
if (signal) {
if (signal.aborted) {
ffmpeg.kill("SIGTERM");
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
Expand All @@ -440,7 +443,7 @@ export async function spawnStreamingEncoder(
if (timer) clearTimeout(timer);
timer = setTimeout(() => {
if (exitStatus === "running") {
ffmpeg.kill("SIGTERM");
cancelEscalations.push(killWithEscalation(ffmpeg));
}
}, streamingTimeout);
};
Expand Down
31 changes: 23 additions & 8 deletions packages/engine/src/services/videoFrameExtractor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { existsSync, mkdirSync, readdirSync, rmSync } from "fs";
import { isAbsolute, join, posix, resolve, sep } from "path";
import { parseHTML } from "linkedom";
import { decodeUrlPathVariants } from "@hyperframes/core";
import { trackChildProcess } from "../utils/processTracker.js";
import { killWithEscalation, trackChildProcess } from "../utils/processTracker.js";
import { extractMediaMetadata, type VideoMetadata } from "../utils/ffprobe.js";
import {
analyzeCompositionHdr,
Expand Down Expand Up @@ -262,34 +262,50 @@ export async function extractVideoFramesRange(
const ffmpeg = spawn("ffmpeg", args);
trackChildProcess(ffmpeg);
let stderr = "";
let timedOut = false;
const cancelEscalations: Array<() => void> = [];
const onAbort = () => {
ffmpeg.kill("SIGTERM");
cancelEscalations.push(killWithEscalation(ffmpeg));
};
if (signal) {
if (signal.aborted) {
ffmpeg.kill("SIGTERM");
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
}

const timer = setTimeout(() => {
ffmpeg.kill("SIGTERM");
timedOut = true;
cancelEscalations.push(killWithEscalation(ffmpeg));
}, ffmpegProcessTimeout);

ffmpeg.stderr.on("data", (data) => {
stderr += data.toString();
});

ffmpeg.on("close", (code) => {
const cleanup = () => {
clearTimeout(timer);
for (const cancel of cancelEscalations) cancel();
if (signal) signal.removeEventListener("abort", onAbort);
};

ffmpeg.on("close", (code) => {
cleanup();
if (signal?.aborted) {
reject(new Error("Video frame extraction cancelled"));
return;
}
if (code !== 0) {
reject(new Error(`FFmpeg exited with code ${code}: ${stderr.slice(-500)}`));
// A timeout kill exits with a signal (code null); name the actual
// cause instead of the unhelpful "exited with code null".
reject(
new Error(
timedOut
? `FFmpeg frame extraction timed out after ${ffmpegProcessTimeout}ms: ${stderr.slice(-500)}`
: `FFmpeg exited with code ${code}: ${stderr.slice(-500)}`,
),
);
return;
}

Expand All @@ -314,8 +330,7 @@ export async function extractVideoFramesRange(
});

ffmpeg.on("error", (err) => {
clearTimeout(timer);
if (signal) signal.removeEventListener("abort", onAbort);
cleanup();
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
reject(new Error("[FFmpeg] ffmpeg not found"));
} else {
Expand Down
11 changes: 4 additions & 7 deletions packages/engine/src/utils/gpuEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { spawn } from "child_process";
import { killWithEscalation } from "./processTracker.js";

export type ConcreteGpuEncoder = "nvenc" | "videotoolbox" | "vaapi" | "qsv" | "amf";
export type GpuEncoder = ConcreteGpuEncoder | null;
Expand Down Expand Up @@ -138,13 +139,13 @@ async function canUseGpuEncoder(encoder: ConcreteGpuEncoder): Promise<boolean> {
return new Promise((resolve) => {
let settled = false;
let timedOut = false;
let killTimer: ReturnType<typeof setTimeout> | undefined;
let cancelEscalation: (() => void) | null = null;
let stderr = "";
const finish = (usable: boolean) => {
if (settled) return;
settled = true;
clearTimeout(timer);
if (killTimer) clearTimeout(killTimer);
cancelEscalation?.();
resolve(usable);
};
const ffmpeg = spawn("ffmpeg", getProbeArgs(encoder), {
Expand All @@ -157,11 +158,7 @@ async function canUseGpuEncoder(encoder: ConcreteGpuEncoder): Promise<boolean> {

const timer = setTimeout(() => {
timedOut = true;
ffmpeg.kill("SIGTERM");
killTimer = setTimeout(() => {
ffmpeg.kill("SIGKILL");
finish(false);
}, GPU_PROBE_KILL_GRACE_MS);
cancelEscalation = killWithEscalation(ffmpeg, GPU_PROBE_KILL_GRACE_MS);
}, GPU_PROBE_TIMEOUT_MS);

ffmpeg.on("close", (code, signal) => {
Expand Down
65 changes: 64 additions & 1 deletion packages/engine/src/utils/processTracker.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, expect, beforeEach } from "vitest";
import { spawn } from "node:child_process";
import { trackChildProcess, killTrackedProcesses } from "./processTracker.js";
import { trackChildProcess, killTrackedProcesses, killWithEscalation } from "./processTracker.js";

// Reset tracked set between tests by killing everything
beforeEach(() => {
Expand Down Expand Up @@ -72,3 +72,66 @@ describe("killTrackedProcesses", () => {
killTrackedProcesses();
});
});

// On Windows, kill("SIGTERM") maps to TerminateProcess and is unconditional,
// so a trap-based shim can't ignore it and the SIGKILL escalation is never
// reached. The whole block exercises POSIX-only signal semantics; skip it
// there, same as the runFfmpeg kill-escalation suite.
describe.skipIf(process.platform === "win32")("killWithEscalation", () => {
it("kills a SIGTERM-compliant process", async () => {
const proc = spawn("sleep", ["60"], { stdio: "ignore" });

const exitPromise = new Promise<void>((resolve) => proc.on("close", () => resolve()));
const cancel = killWithEscalation(proc);

await exitPromise;
cancel();
expect(proc.signalCode).toBe("SIGTERM");
});

it("escalates to SIGKILL when the process ignores SIGTERM", async () => {
// Block on the bash builtin `read` (stdin held open by the pipe) instead
// of spawning `sleep`: a SIGKILLed bash reparents the sleep child, which
// then lingers for 60s and accumulates orphans in watch/parallel runs.
const proc = spawn("bash", ["-c", "trap '' TERM; read -t 60 _"], {
stdio: ["pipe", "ignore", "ignore"],
});
// Give bash a beat to install the trap; killing before that races the
// trap setup and SIGTERM would win legitimately.
await new Promise((resolve) => setTimeout(resolve, 200));

const exitPromise = new Promise<void>((resolve) => proc.on("close", () => resolve()));
const cancel = killWithEscalation(proc, 100);

await exitPromise;
cancel();
expect(proc.signalCode).toBe("SIGKILL");
}, 5000);

it("cancel clears the pending escalation", async () => {
const proc = spawn("bash", ["-c", "trap '' TERM; read -t 60 _"], {
stdio: ["pipe", "ignore", "ignore"],
});
await new Promise((resolve) => setTimeout(resolve, 200));

const cancel = killWithEscalation(proc, 100);
cancel();

// Past the grace period the process must still be alive: SIGTERM was
// trapped and the SIGKILL escalation was cancelled.
await new Promise((resolve) => setTimeout(resolve, 300));
expect(proc.exitCode).toBeNull();
expect(proc.signalCode).toBeNull();

proc.kill("SIGKILL");
await new Promise<void>((resolve) => proc.on("close", () => resolve()));
}, 5000);

it("does not throw on an already-exited process", async () => {
const proc = spawn("true", { stdio: "ignore" });
await new Promise<void>((resolve) => proc.on("close", () => resolve()));

const cancel = killWithEscalation(proc);
cancel();
});
});
35 changes: 35 additions & 0 deletions packages/engine/src/utils/processTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,41 @@ export function trackChildProcess(proc: ChildProcess): void {
proc.once("error", remove);
}

const KILL_ESCALATION_GRACE_MS = 500;

/**
* Kill a single child process with SIGTERM, escalating to SIGKILL if it has
* not exited after a short grace period. Same policy as
* killTrackedProcesses(), but for timeout/abort kills of one process whose
* caller is awaiting its `close` event — without the escalation, a process
* that ignores SIGTERM (stuck I/O, frozen pipe) never emits `close` and the
* awaiting promise hangs forever.
*
* Returns a cancel function; call it once the process exits so the
* escalation timer doesn't outlive it.
*/
export function killWithEscalation(
proc: ChildProcess,
graceMs: number = KILL_ESCALATION_GRACE_MS,
): () => void {
try {
proc.kill("SIGTERM");
} catch {
// Already exited.
}
const timer = setTimeout(() => {
if (proc.exitCode === null && proc.signalCode === null) {
try {
proc.kill("SIGKILL");
} catch {
// Already exited.
}
}
}, graceMs);
timer.unref();
return () => clearTimeout(timer);
}

/**
* SIGTERM all tracked child processes, then SIGKILL any that survive
* after a short grace period.
Expand Down
Loading
Loading