diff --git a/packages/cli/src/tts/manager.ts b/packages/cli/src/tts/manager.ts index 317f8d307..87ed079ad 100644 --- a/packages/cli/src/tts/manager.ts +++ b/packages/cli/src/tts/manager.ts @@ -1,8 +1,47 @@ -import { existsSync, mkdirSync } from "node:fs"; +import { existsSync, mkdirSync, writeFileSync, unlinkSync, readFileSync, statSync } from "node:fs"; import { homedir } from "node:os"; import { join } from "node:path"; import { downloadFile } from "../utils/download.js"; +const LOCK_STALE_MS = 300_000; // 5 min — generous to cover large model downloads + +function acquireLock(lockPath: string): boolean { + try { + writeFileSync(lockPath, String(process.pid), { flag: "wx" }); + return true; + } catch { + // Lock exists — check if it's stale + try { + const stat = statSync(lockPath); + if (Date.now() - stat.mtimeMs > LOCK_STALE_MS) { + unlinkSync(lockPath); + writeFileSync(lockPath, String(process.pid), { flag: "wx" }); + return true; + } + } catch { + // Race — another process grabbed it + } + return false; + } +} + +function releaseLock(lockPath: string): void { + try { + const content = readFileSync(lockPath, "utf-8").trim(); + if (content === String(process.pid)) unlinkSync(lockPath); + } catch { + // Already cleaned up + } +} + +async function waitForLock(lockPath: string, timeoutMs: number = 300_000): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (!existsSync(lockPath)) return; + await new Promise((r) => setTimeout(r, 1_000)); + } +} + const CACHE_DIR = join(homedir(), ".cache", "hyperframes", "tts"); const MODELS_DIR = join(CACHE_DIR, "models"); const VOICES_DIR = join(CACHE_DIR, "voices"); @@ -114,8 +153,20 @@ export async function ensureModel( } mkdirSync(MODELS_DIR, { recursive: true }); - options?.onProgress?.(`Downloading TTS model ${model} (~311 MB)...`); - await downloadFile(url, modelPath); + const lockPath = `${modelPath}.lock`; + if (!acquireLock(lockPath)) { + options?.onProgress?.("Another process is downloading the model, waiting..."); + await waitForLock(lockPath); + if (existsSync(modelPath)) return modelPath; + } + + try { + if (existsSync(modelPath)) return modelPath; + options?.onProgress?.(`Downloading TTS model ${model} (~311 MB)...`); + await downloadFile(url, modelPath); + } finally { + releaseLock(lockPath); + } if (!existsSync(modelPath)) { throw new Error(`Model download failed: ${model}`); @@ -135,8 +186,20 @@ export async function ensureVoices(options?: { if (existsSync(voicesPath)) return voicesPath; mkdirSync(VOICES_DIR, { recursive: true }); - options?.onProgress?.("Downloading voice data (~27 MB)..."); - await downloadFile(VOICES_URL, voicesPath); + const lockPath = `${voicesPath}.lock`; + if (!acquireLock(lockPath)) { + options?.onProgress?.("Another process is downloading voice data, waiting..."); + await waitForLock(lockPath); + if (existsSync(voicesPath)) return voicesPath; + } + + try { + if (existsSync(voicesPath)) return voicesPath; + options?.onProgress?.("Downloading voice data (~27 MB)..."); + await downloadFile(VOICES_URL, voicesPath); + } finally { + releaseLock(lockPath); + } if (!existsSync(voicesPath)) { throw new Error("Voice data download failed"); diff --git a/packages/cli/src/tts/synthesize.ts b/packages/cli/src/tts/synthesize.ts index 829417914..a50f77add 100644 --- a/packages/cli/src/tts/synthesize.ts +++ b/packages/cli/src/tts/synthesize.ts @@ -48,7 +48,7 @@ function hasPythonPackage(python: string, pkg: string): boolean { try { execFileSync(python, ["-c", `import ${pkg}`], { stdio: ["pipe", "pipe", "pipe"], - timeout: 10_000, + timeout: 30_000, // ONNX runtime import can take 10+ seconds on cold start }); return true; } catch { @@ -203,6 +203,7 @@ export async function synthesize( { encoding: "utf-8", timeout: 300_000, + maxBuffer: 10 * 1024 * 1024, // 10 MB — ONNX runtime prints verbose warnings stdio: ["pipe", "pipe", "pipe"], }, ); diff --git a/packages/cli/src/utils/download.ts b/packages/cli/src/utils/download.ts index 5b8825625..0ea3bec7a 100644 --- a/packages/cli/src/utils/download.ts +++ b/packages/cli/src/utils/download.ts @@ -1,28 +1,52 @@ -import { createWriteStream, renameSync, unlinkSync } from "node:fs"; +import { createWriteStream, renameSync, unlinkSync, existsSync } from "node:fs"; import { get as httpsGet } from "node:https"; import { pipeline } from "node:stream/promises"; -/** - * Download a file from a URL, following redirects. - * Uses atomic write (download to .tmp, rename on success) to prevent - * corrupt partial files from persisting in the cache on interruption. - */ -export function downloadFile(url: string, dest: string): Promise { +const SOCKET_TIMEOUT_MS = 30_000; +const RESPONSE_TIMEOUT_MS = 60_000; +const MAX_RETRIES = 3; +const RETRY_BASE_MS = 1_000; +const MAX_REDIRECTS = 10; + +function attempt(url: string, dest: string): Promise { const tmp = `${dest}.tmp`; return new Promise((resolve, reject) => { + let redirects = 0; + const follow = (u: string) => { - httpsGet(u, (res) => { + if (++redirects > MAX_REDIRECTS) { + reject(new Error("Download failed: too many redirects")); + return; + } + const req = httpsGet(u, (res) => { if (res.statusCode === 301 || res.statusCode === 302) { const location = res.headers.location; if (location) { + res.resume(); follow(location); return; } } + if (res.statusCode === 403 || res.statusCode === 429) { + res.resume(); + reject( + new Error( + `Download failed: HTTP ${res.statusCode} (rate limited). ` + + `GitHub throttles unauthenticated release downloads. Retry in a moment.`, + ), + ); + return; + } if (res.statusCode !== 200) { + res.resume(); reject(new Error(`Download failed: HTTP ${res.statusCode}`)); return; } + + res.setTimeout(RESPONSE_TIMEOUT_MS, () => { + res.destroy(new Error("Download stalled: response timeout")); + }); + const file = createWriteStream(tmp); pipeline(res, file) .then(() => { @@ -37,9 +61,13 @@ export function downloadFile(url: string, dest: string): Promise { } reject(err); }); - }).on("error", (err) => { + }); + req.setTimeout(SOCKET_TIMEOUT_MS, () => { + req.destroy(new Error("Download failed: connection timeout")); + }); + req.on("error", (err) => { try { - unlinkSync(tmp); + if (existsSync(tmp)) unlinkSync(tmp); } catch { // ignore cleanup failure } @@ -49,3 +77,25 @@ export function downloadFile(url: string, dest: string): Promise { follow(url); }); } + +/** + * Download a file from a URL with retry, following redirects. + * Uses atomic write (download to .tmp, rename on success) to prevent + * corrupt partial files from persisting in the cache on interruption. + */ +export async function downloadFile(url: string, dest: string): Promise { + let lastErr: Error | undefined; + for (let i = 0; i <= MAX_RETRIES; i++) { + try { + await attempt(url, dest); + return; + } catch (err) { + lastErr = err instanceof Error ? err : new Error(String(err)); + if (i < MAX_RETRIES) { + const delay = RETRY_BASE_MS * 2 ** i; + await new Promise((r) => setTimeout(r, delay)); + } + } + } + throw lastErr; +}