diff --git a/scripts/service-entry.ts b/scripts/service-entry.ts new file mode 100644 index 0000000..1410e48 --- /dev/null +++ b/scripts/service-entry.ts @@ -0,0 +1,14 @@ +#!/usr/bin/env tsx +/** + * systemd entrypoint for cli-bridge. + * + * Keep the service command line distinct from application dev servers such as + * `tsx src/server.ts`. Some ADC cleanup paths use broad process patterns for + * dev servers; running the bridge through this wrapper keeps the long-lived + * shared bridge out of that blast radius while preserving the normal server + * startup path. + */ + +import { startServer } from '../src/server.js' + +await startServer() diff --git a/src/admission.ts b/src/admission.ts new file mode 100644 index 0000000..2ef19d8 --- /dev/null +++ b/src/admission.ts @@ -0,0 +1,139 @@ +export interface AdmissionSnapshot { + active: number + queued: number + maxActive: number + maxQueue: number +} + +export interface AdmissionLease { + release(): void +} + +export class AdmissionRejectedError extends Error { + constructor( + message: string, + public readonly reason: 'queue_full' | 'queue_timeout' | 'aborted', + public readonly snapshot: AdmissionSnapshot, + ) { + super(message) + this.name = 'AdmissionRejectedError' + } +} + +interface Waiter { + resolve: (lease: AdmissionLease) => void + reject: (err: AdmissionRejectedError) => void + signal?: AbortSignal + timer?: ReturnType + onAbort?: () => void +} + +export interface AdmissionGateOptions { + maxActive: number + maxQueue: number + queueTimeoutMs: number +} + +export class AdmissionGate { + private active = 0 + private readonly waiters: Waiter[] = [] + + constructor(private readonly opts: AdmissionGateOptions) { + if (!Number.isInteger(opts.maxActive) || opts.maxActive < 1) { + throw new Error(`invalid maxActive: ${opts.maxActive}`) + } + if (!Number.isInteger(opts.maxQueue) || opts.maxQueue < 0) { + throw new Error(`invalid maxQueue: ${opts.maxQueue}`) + } + if (!Number.isInteger(opts.queueTimeoutMs) || opts.queueTimeoutMs < 0) { + throw new Error(`invalid queueTimeoutMs: ${opts.queueTimeoutMs}`) + } + } + + snapshot(): AdmissionSnapshot { + return { + active: this.active, + queued: this.waiters.length, + maxActive: this.opts.maxActive, + maxQueue: this.opts.maxQueue, + } + } + + acquire(signal?: AbortSignal): Promise { + if (signal?.aborted) { + return Promise.reject(this.rejected('admission aborted before queueing', 'aborted')) + } + + if (this.active < this.opts.maxActive) { + this.active += 1 + return Promise.resolve(this.makeLease()) + } + + if (this.waiters.length >= this.opts.maxQueue) { + return Promise.reject(this.rejected('cli-bridge is saturated: admission queue is full', 'queue_full')) + } + + return new Promise((resolve, reject) => { + const waiter: Waiter = { resolve, reject, signal } + waiter.onAbort = () => { + this.removeWaiter(waiter) + reject(this.rejected('cli-bridge admission aborted while queued', 'aborted')) + } + if (signal) { + signal.addEventListener('abort', waiter.onAbort, { once: true }) + } + if (this.opts.queueTimeoutMs > 0) { + waiter.timer = setTimeout(() => { + this.removeWaiter(waiter) + reject(this.rejected(`cli-bridge admission timed out after ${this.opts.queueTimeoutMs}ms`, 'queue_timeout')) + }, this.opts.queueTimeoutMs) + waiter.timer.unref?.() + } + this.waiters.push(waiter) + }) + } + + private makeLease(): AdmissionLease { + let released = false + return { + release: () => { + if (released) return + released = true + this.release() + }, + } + } + + private release(): void { + if (this.active > 0) this.active -= 1 + while (this.waiters.length > 0 && this.active < this.opts.maxActive) { + const next = this.waiters.shift() + if (!next) return + this.cleanup(next) + if (next.signal?.aborted) { + next.reject(this.rejected('cli-bridge admission aborted while queued', 'aborted')) + continue + } + this.active += 1 + next.resolve(this.makeLease()) + return + } + } + + private removeWaiter(waiter: Waiter): void { + const idx = this.waiters.indexOf(waiter) + if (idx !== -1) this.waiters.splice(idx, 1) + this.cleanup(waiter) + } + + private cleanup(waiter: Waiter): void { + if (waiter.timer) clearTimeout(waiter.timer) + if (waiter.signal && waiter.onAbort) { + waiter.signal.removeEventListener('abort', waiter.onAbort) + } + } + + private rejected(message: string, reason: AdmissionRejectedError['reason']): AdmissionRejectedError { + return new AdmissionRejectedError(message, reason, this.snapshot()) + } +} diff --git a/src/backends/claude.ts b/src/backends/claude.ts index 9f757d6..4d3abcb 100644 --- a/src/backends/claude.ts +++ b/src/backends/claude.ts @@ -32,7 +32,7 @@ import { type MaterialisedMcpConfig, } from './profile-support.js' import { contentToText } from './content.js' -import { hostSpawner } from '../executors/host.js' +import { scopedHostSpawner } from '../executors/scoped-host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' import { writeStdinPayload } from './stdin-payload.js' @@ -103,7 +103,7 @@ export class ClaudeBackend implements Backend { this.timeoutMs = opts.timeoutMs this.anthropicBaseUrl = opts.anthropicBaseUrl ?? null this.prefix = `${this.name}/` - this.spawner = opts.spawner ?? hostSpawner + this.spawner = opts.spawner ?? scopedHostSpawner } matches(model: string): boolean { diff --git a/src/backends/codex.ts b/src/backends/codex.ts index 6db5c55..80dfcad 100644 --- a/src/backends/codex.ts +++ b/src/backends/codex.ts @@ -36,7 +36,7 @@ import { resolvePromptMessages, } from './profile-support.js' import { contentToText } from './content.js' -import { hostSpawner } from '../executors/host.js' +import { scopedHostSpawner } from '../executors/scoped-host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' import { killTree } from '../executors/process-tree.js' @@ -52,7 +52,7 @@ export class CodexBackend implements Backend { readonly name = 'codex' private readonly spawner: Spawner constructor(private readonly opts: CodexBackendOptions) { - this.spawner = opts.spawner ?? hostSpawner + this.spawner = opts.spawner ?? scopedHostSpawner } matches(model: string): boolean { diff --git a/src/backends/kimi.ts b/src/backends/kimi.ts index bb653f0..0082ae1 100644 --- a/src/backends/kimi.ts +++ b/src/backends/kimi.ts @@ -44,7 +44,7 @@ import { resolvePromptMessages, } from './profile-support.js' import { contentToText } from './content.js' -import { hostSpawner } from '../executors/host.js' +import { scopedHostSpawner } from '../executors/scoped-host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' import { writeStdinPayload } from './stdin-payload.js' @@ -67,7 +67,7 @@ export class KimiBackend implements Backend { constructor(private readonly opts: KimiBackendOptions) { this.name = opts.harness ?? 'kimi-code' this.prefix = `${this.name}/` - this.spawner = opts.spawner ?? hostSpawner + this.spawner = opts.spawner ?? scopedHostSpawner } matches(model: string): boolean { diff --git a/src/backends/opencode.ts b/src/backends/opencode.ts index c8420e9..7e80ab2 100644 --- a/src/backends/opencode.ts +++ b/src/backends/opencode.ts @@ -23,7 +23,7 @@ import { assertModeSupported } from '../modes.js' import type { SessionRecord } from '../sessions/store.js' import { materialiseMcpServersForOpencode, resolveMcpServers, resolvePromptMessages } from './profile-support.js' import { contentToText } from './content.js' -import { hostSpawner } from '../executors/host.js' +import { scopedHostSpawner } from '../executors/scoped-host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' import { writeStdinPayload } from './stdin-payload.js' @@ -40,7 +40,7 @@ export class OpencodeBackend implements Backend { readonly name = 'opencode' private readonly spawner: Spawner constructor(private readonly opts: OpencodeBackendOptions) { - this.spawner = opts.spawner ?? hostSpawner + this.spawner = opts.spawner ?? scopedHostSpawner } matches(model: string): boolean { diff --git a/src/config.ts b/src/config.ts index d4d232f..3eac227 100644 --- a/src/config.ts +++ b/src/config.ts @@ -31,6 +31,11 @@ export interface Config { piBin: string piTimeoutMs: number cliTimeoutMsDefault: number + admission: { + maxActive: number + maxQueue: number + queueTimeoutMs: number + } /** * When set, the `claudish` harness is registered and Claude Code is * spawned with ANTHROPIC_BASE_URL= for `claudish/*` model ids. @@ -139,6 +144,11 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Config { piBin: env.PI_BIN ?? 'pi', piTimeoutMs: Number.parseInt(env.PI_TIMEOUT_MS ?? String(defaultTimeout), 10), cliTimeoutMsDefault: defaultTimeout, + admission: { + maxActive: parsePositiveInt(env.BRIDGE_HOST_CHAT_MAX_ACTIVE, 24), + maxQueue: parseNonNegativeInt(env.BRIDGE_HOST_CHAT_MAX_QUEUE, 64), + queueTimeoutMs: parseNonNegativeInt(env.BRIDGE_HOST_CHAT_QUEUE_TIMEOUT_MS, 30_000), + }, claudishUrl: env.CLAUDISH_URL?.trim() || null, openaiApiKey: env.OPENAI_API_KEY?.trim() || null, anthropicApiKey: env.ANTHROPIC_API_KEY?.trim() || null, @@ -152,6 +162,24 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Config { } } +function parsePositiveInt(value: string | undefined, fallback: number): number { + if (value === undefined || value === '') return fallback + const parsed = Number.parseInt(value, 10) + if (!Number.isInteger(parsed) || parsed < 1) { + throw new Error(`invalid positive integer: ${value}`) + } + return parsed +} + +function parseNonNegativeInt(value: string | undefined, fallback: number): number { + if (value === undefined || value === '') return fallback + const parsed = Number.parseInt(value, 10) + if (!Number.isInteger(parsed) || parsed < 0) { + throw new Error(`invalid non-negative integer: ${value}`) + } + return parsed +} + /** * Per-backend executor defaults. All subprocess backends share the * same default runtime image (`cli-bridge-cli-runtime`) — that image diff --git a/src/executors/scoped-host.ts b/src/executors/scoped-host.ts new file mode 100644 index 0000000..42934a1 --- /dev/null +++ b/src/executors/scoped-host.ts @@ -0,0 +1,211 @@ +/** + * Scoped host spawner — wraps node `spawn` in a transient systemd + * `--user --scope` so the entire process tree lives in its own cgroup. + * + * Why this exists: + * + * The default hostSpawner relies on `detached: true` + `kill(-pgid)` + * to reap the spawned CLI and its descendants. That works as long + * as descendants stay in the original process group. It does not + * work for grandchildren that call `setsid()` to escape — e.g. + * vitest workers, `pnpm dev` child servers, or test fixtures that + * intentionally install `process.on('SIGTERM', () => {})` and keep + * themselves alive with `setInterval(() => {}, 1000)`. + * + * Production failure mode this addresses (2026-05-22 → 2026-05-23): + * LLM CLIs invoked via cli-bridge ran `pnpm test` inside PR + * review worktrees. The vitest children of those test runs + * detached into their own process groups, survived `killTree()`, + * and accumulated in the cli-bridge.service cgroup. Over ~36 hours + * the bridge's TasksMax saturated (766/768) and every subsequent + * spawn returned EAGAIN. The pr-reviewer aggregator published + * "⚠️ Review Failed — All review passes errored" on every open PR + * across six repos. + * + * Strategy: + * + * For each spawn, ask the user systemd manager to create a + * transient scope under `cli-bridge-llm.slice`: + * + * systemd-run --user --scope --collect --quiet + * --unit=cli-bridge-.scope + * --slice=cli-bridge-llm.slice + * -- + * + * The scope owns its own cgroup. On chat() finally we write `1` + * to the scope's `cgroup.kill` — a Linux 5.14+ kernel feature + * that SIGKILLs every task in the cgroup atomically, regardless + * of pgid manipulation. `--collect` removes the unit once empty. + * + * killTree() still runs first to give the direct child a chance + * to flush stdout and exit cleanly; the cgroup-kill in release() + * is the belt-and-suspenders backstop that catches escapees. + * + * Fallback: + * + * If systemd-run is unavailable (running outside a systemd user + * manager, in a minimal container, etc.) this spawner degrades + * to hostSpawner. Detection is a one-shot synchronous probe at + * module load — cheap and definitive. + */ + +import { spawn } from 'node:child_process' +import { randomBytes } from 'node:crypto' +import { writeFile } from 'node:fs/promises' +import { existsSync, readFileSync, statSync } from 'node:fs' +import { hostSpawner, sanitizeHostEnv } from './host.js' +import type { Spawner, SpawnResult } from './types.js' + +const SLICE = 'cli-bridge-llm.slice' +const DEFAULT_SCOPE_TASKS_MAX = 128 +const DEFAULT_SCOPE_MEMORY_MAX = '3G' +const DEFAULT_SCOPE_RUNTIME_MAX_SEC = 7200 + +/** Result of the one-shot probe. `null` until first call, then cached. */ +let systemdRunUsable: boolean | null = null + +function probeSystemdRun(): boolean { + if (systemdRunUsable !== null) return systemdRunUsable + try { + // systemd-run is at a stable path on every distro we support. + // We probe by spawning `systemd-run --user --scope --quiet -- /bin/true` + // synchronously is awkward, so probe by file existence + a cheap + // env check. The actual call site catches spawn errors and falls + // back per-invocation; this just avoids the overhead of trying + // when we know systemd-run can't work. + if (!existsSync('/usr/bin/systemd-run') && !existsSync('/bin/systemd-run')) { + systemdRunUsable = false + return false + } + // User systemd manager must be reachable. XDG_RUNTIME_DIR + // pointing at a directory with systemd/private is the canonical + // signal that `--user` will work. + const xdg = process.env.XDG_RUNTIME_DIR + if (!xdg) { systemdRunUsable = false; return false } + if (!existsSync(`${xdg}/systemd/private`)) { systemdRunUsable = false; return false } + systemdRunUsable = true + return true + } catch { + systemdRunUsable = false + return false + } +} + +/** + * Resolve the cgroup filesystem path for our spawned wrapper by + * reading `/proc//cgroup`. Works for cgroup v2 unified hierarchy + * (the only mode systemd 250+ supports for user managers). + * + * Returns `null` if the process is gone or the cgroup couldn't be + * resolved; callers degrade to `systemctl --user stop `. + */ +function resolveCgroupPath(pid: number): string | null { + try { + const raw = readFileSync(`/proc/${pid}/cgroup`, 'utf8') + // cgroup v2 line format: "0::/user.slice/.../scope-unit.scope" + const line = raw.split('\n').find((l) => l.startsWith('0::')) + if (!line) return null + const rel = line.slice(3) + const abs = `/sys/fs/cgroup${rel}` + return statSync(abs).isDirectory() ? abs : null + } catch { + return null + } +} + +async function killCgroup(pid: number, unitName: string): Promise { + const cgPath = resolveCgroupPath(pid) + if (cgPath) { + try { + // cgroup.kill (Linux 5.14+) SIGKILLs every task in the cgroup + // atomically. Faster than walking cgroup.procs and ignores + // pgid manipulation by descendants. + await writeFile(`${cgPath}/cgroup.kill`, '1') + return + } catch { + // fall through to systemctl + } + } + // Fallback: ask systemd to stop the unit. Slower (DBus round-trip) + // but works on kernels older than 5.14 or when /proc/ is + // already gone. + await new Promise((resolve) => { + const p = spawn('systemctl', ['--user', '--quiet', 'stop', unitName], { + stdio: 'ignore', + detached: true, + }) + p.on('error', () => resolve()) + p.on('exit', () => resolve()) + // Don't block shutdown forever on a hung systemctl. + setTimeout(() => { try { p.kill('SIGKILL') } catch {}; resolve() }, 3000).unref?.() + }) +} + +export const scopedHostSpawner: Spawner = async (bin, args, opts) => { + if (!probeSystemdRun()) { + return hostSpawner(bin, args, opts) + } + + // Unit name MUST be unique per spawn; collisions would refuse to + // start. Include pid + 12 random hex chars (96 bits of entropy). + const unitName = `cli-bridge-${process.pid}-${randomBytes(6).toString('hex')}.scope` + const tasksMax = positiveIntEnv('CLI_BRIDGE_SCOPE_TASKS_MAX', DEFAULT_SCOPE_TASKS_MAX) + const runtimeMaxSec = positiveIntEnv('CLI_BRIDGE_SCOPE_RUNTIME_MAX_SEC', DEFAULT_SCOPE_RUNTIME_MAX_SEC) + const memoryMax = process.env.CLI_BRIDGE_SCOPE_MEMORY_MAX || DEFAULT_SCOPE_MEMORY_MAX + + const wrapped: string[] = [ + '--user', + '--scope', + '--collect', // auto-remove the scope unit once empty + '--quiet', + `--unit=${unitName}`, + `--slice=${SLICE}`, + `--property=TasksMax=${tasksMax}`, + `--property=MemoryMax=${memoryMax}`, + `--property=RuntimeMaxSec=${runtimeMaxSec}`, + '--property=OOMPolicy=stop', + '--', + bin, + ...args, + ] + + const child = spawn('/usr/bin/systemd-run', wrapped, { + stdio: opts.stdio ?? ['ignore', 'pipe', 'pipe'], + cwd: opts.cwd, + env: sanitizeHostEnv(opts.env), + // `detached: true` makes the wrapper a process-group leader, so + // existing killTree() (kill -pgid) still works as the graceful + // first signal. The cgroup-kill in release() is the hard backstop. + detached: true, + }) + + let spawnError: Error | null = null + child.on('error', (err) => { spawnError = err }) + + let released = false + const release = (): void => { + if (released) return + released = true + // Fire-and-forget: writing 1 to cgroup.kill is synchronous from + // the kernel's perspective; the actual SIGKILLs cascade + // asynchronously and we don't need to await them. Errors are + // swallowed because by the time release() runs the scope may + // have already auto-collected if the child exited cleanly. + const pid = child.pid + if (pid !== undefined) { + void killCgroup(pid, unitName).catch(() => {}) + } + } + + const result: SpawnResult = { + child, + release, + spawnError: () => spawnError, + } + return result +} + +function positiveIntEnv(name: string, fallback: number): number { + const value = Number(process.env[name]) + return Number.isInteger(value) && value > 0 ? value : fallback +} diff --git a/src/routes/chat-completions.ts b/src/routes/chat-completions.ts index 18859fa..0c988fc 100644 --- a/src/routes/chat-completions.ts +++ b/src/routes/chat-completions.ts @@ -23,6 +23,7 @@ import type { ChatDelta, ChatRequest } from '../backends/types.js' import { BackendError } from '../backends/types.js' import { parseMode, ModeNotSupportedError } from '../modes.js' import { collectNonStreaming, deltaToOpenAIChunk, deltaToSseComment, makeChunkMeta } from '../streaming/sse.js' +import { AdmissionRejectedError, type AdmissionGate, type AdmissionLease } from '../admission.js' const DEFAULT_SSE_HEARTBEAT_MS = 15_000 @@ -130,7 +131,7 @@ const chatRequestSchema = z.object({ export function mountChatCompletions( app: Hono, - deps: { registry: BackendRegistry; sessions: SessionStore }, + deps: { registry: BackendRegistry; sessions: SessionStore; admission?: AdmissionGate }, ): void { app.post('/v1/chat/completions', async (c) => { let raw: unknown @@ -242,7 +243,8 @@ export function mountChatCompletions( // + prompt + cwd contract is identical — only the execution location // changes. Map the host harness → in-container backend type via // `harnessToSandboxBackendType`. - let source + let source: AsyncIterable + let admissionLease: AdmissionLease | null = null if (req.execution?.kind === 'sandbox' && backend.name !== 'sandbox') { const sandboxBackend = deps.registry.byName('sandbox') if (!sandboxBackend) { @@ -267,6 +269,13 @@ export function mountChatCompletions( } source = sandboxBackend.chat(delegatedReq, session, ac.signal) } else { + if (deps.admission && shouldApplyHostAdmission(backend.name, req)) { + try { + admissionLease = await deps.admission.acquire(ac.signal) + } catch (err) { + return admissionErrorResponse(c, err) + } + } source = backend.chat(req, session, ac.signal) } @@ -300,6 +309,8 @@ export function mountChatCompletions( } yield { finish_reason: 'error' } satisfies ChatDelta console.error(`[cli-bridge] backend ${backend.name} failed:`, err) + } finally { + admissionLease?.release() } }, } @@ -325,19 +336,45 @@ export function mountChatCompletions( return streamSSE(c, async (stream) => { const meta = makeChunkMeta(req.model) const heartbeatMs = resolveSseHeartbeatMs() + let streamClosed = false + const abortStream = (): void => { + streamClosed = true + ac.abort() + } + const writeRaw = async (chunk: string): Promise => { + if (streamClosed || ac.signal.aborted) return false + try { + await stream.write(chunk) + return true + } catch { + abortStream() + return false + } + } + const writeSse = async (data: string): Promise => { + if (streamClosed || ac.signal.aborted) return false + try { + await stream.writeSSE({ data }) + return true + } catch { + abortStream() + return false + } + } const heartbeat = setInterval(() => { - void stream.write(': keepalive\n\n').catch(() => {}) + void writeRaw(': keepalive\n\n') }, heartbeatMs) try { - await stream.write(': connected\n\n') + if (!await writeRaw(': connected\n\n')) return for await (const delta of wrapped) { + if (ac.signal.aborted) break // Backend-level liveness ping (e.g. kimi/opencode stdout idle): // render as SSE comment so the consumer (AI SDK, openai-node) // ignores it per spec instead of trying to route a fake tool // call. SSE comments also count as transport heartbeats. const comment = deltaToSseComment(delta) if (comment) { - await stream.write(comment).catch(() => {}) + if (!await writeRaw(comment)) break continue } const chunk = deltaToOpenAIChunk(delta, meta) @@ -348,22 +385,21 @@ export function mountChatCompletions( // deltaToOpenAIChunk returns a complete "data: …\n\n" line. // Strip the framing so streamSSE can re-add it. const payload = chunk.slice('data: '.length).replace(/\n\n$/, '') - await stream.writeSSE({ data: payload }) + if (!await writeSse(payload)) break } } catch (err) { + if (ac.signal.aborted) return const type = err instanceof ModeNotSupportedError ? 'mode_not_supported' : err instanceof BackendError ? err.code : 'server_error' const message = err instanceof Error ? err.message : String(err) - await stream.writeSSE({ - data: JSON.stringify({ error: { message, type } }), - }) + await writeSse(JSON.stringify({ error: { message, type } })) } finally { clearInterval(heartbeat) } - await stream.writeSSE({ data: '[DONE]' }) + await writeSse('[DONE]') }) }) } @@ -413,6 +449,9 @@ function normalizeResponseFormat(format: { type: 'text' | 'json_object' | 'json_ } function errorResponse(c: Context, err: unknown): Response { + if (err instanceof AdmissionRejectedError) { + return admissionErrorResponse(c, err) + } if (err instanceof ModeNotSupportedError) { return c.json({ error: { message: err.message, type: 'mode_not_supported' } }, 501) } @@ -431,6 +470,26 @@ function errorResponse(c: Context, err: unknown): Response { return c.json({ error: { message, type: 'server_error' } }, 500) } +function admissionErrorResponse(c: Context, err: unknown): Response { + if (!(err instanceof AdmissionRejectedError)) { + return errorResponse(c, err) + } + c.header('Retry-After', '5') + return c.json({ + error: { + message: err.message, + type: 'admission_rejected', + reason: err.reason, + admission: err.snapshot, + }, + }, 503) +} + +function shouldApplyHostAdmission(backendName: string, req: ChatRequest): boolean { + if (req.execution?.kind === 'sandbox') return false + return backendName !== 'sandbox' && backendName !== 'passthrough' +} + /** * Map a host harness name (the `Backend.name` field — `claude`, * `kimi-code`, `gemini`, `codex`, `opencode`, `amp`, `factory`, `forge`) to the diff --git a/src/routes/health.ts b/src/routes/health.ts index e4904ec..5effdee 100644 --- a/src/routes/health.ts +++ b/src/routes/health.ts @@ -35,6 +35,7 @@ import { Hono } from 'hono' import type { BackendRegistry } from '../backends/registry.js' import type { Backend, BackendHealth } from '../backends/types.js' +import type { AdmissionGate } from '../admission.js' const DEFAULT_HEALTH_CACHE_MS = 30_000 const DEFAULT_PROBE_TIMEOUT_MS = 3_500 @@ -59,7 +60,7 @@ export interface MountHealthOptions { export function mountHealth( app: Hono, - deps: { registry: BackendRegistry }, + deps: { registry: BackendRegistry; admission?: AdmissionGate }, options: MountHealthOptions = {}, ): void { const cacheMs = options.cacheMs ?? resolveEnvMs('BRIDGE_HEALTH_CACHE_MS', DEFAULT_HEALTH_CACHE_MS) @@ -90,6 +91,7 @@ export function mountHealth( return c.json({ status: any ? 'ok' : 'degraded', backends: probes, + ...(deps.admission ? { admission: deps.admission.snapshot() } : {}), ts: new Date(ts).toISOString(), }, any ? 200 : 503) }) diff --git a/src/routes/models.ts b/src/routes/models.ts index a10fdb1..894f34f 100644 --- a/src/routes/models.ts +++ b/src/routes/models.ts @@ -37,7 +37,12 @@ const PI_MODELS: ReadonlyArray<{ id: string; note?: string }> = [ { id: 'deepseek/deepseek-v4-pro', note: 'DeepSeek V4 Pro via pi' }, { id: 'deepseek/deepseek-v4-flash', note: 'DeepSeek V4 Flash via pi' }, { id: 'moonshot/kimi-k2.5', note: 'Moonshot Kimi K2.5 via pi' }, + { id: 'moonshot/kimi-k2.6', note: 'Moonshot Kimi K2.6 via pi' }, { id: 'moonshot/kimi-k2-thinking', note: 'Moonshot Kimi K2 Thinking via pi' }, + { id: 'kimi-for-coding-oauth/kimi-k2.6', note: 'Kimi K2.6 via pi kimi-for-coding-oauth extension' }, + { id: 'openai-codex/gpt-5.3-codex', note: 'GPT-5.3-codex via pi openai-codex extension' }, + { id: 'openai-codex/gpt-5.4', note: 'GPT-5.4 via pi openai-codex extension' }, + { id: 'openai-codex/gpt-5.5', note: 'GPT-5.5 via pi openai-codex extension' }, { id: 'zai-coding-paas/glm-5.1', note: 'GLM 5.1 via pi zai-coding-paas extension' }, { id: 'zai-coding-paas/glm-5-turbo', note: 'GLM 5 Turbo via pi zai-coding-paas extension' }, { id: 'zai-coding-paas/glm-5', note: 'GLM 5 via pi zai-coding-paas extension' }, diff --git a/src/server.ts b/src/server.ts index e5fe079..49f6a2a 100644 --- a/src/server.ts +++ b/src/server.ts @@ -36,6 +36,7 @@ import { ContainerPool } from './executors/container-pool.js' import { createDockerSpawner } from './executors/docker.js' import type { Spawner } from './executors/types.js' import type { BackendExecutorConfig } from './config.js' +import { AdmissionGate } from './admission.js' function parseEnvPositiveInt(name: string, fallback: number): number { const raw = process.env[name] @@ -106,6 +107,7 @@ export async function buildApp(config: Config): Promise<{ const registry = new BackendRegistry() const extras: BuildAppExtras = { shutdownHooks: [] } const catalog = createProfileCatalog(config.sandboxProfilesDir) + const admission = new AdmissionGate(config.admission) // Register order matters — first match wins. Harness-specific backends // come first so a `claude-code/sonnet` doesn't get claimed by a @@ -214,11 +216,11 @@ export async function buildApp(config: Config): Promise<{ }) } - mountHealth(app, { registry }) + mountHealth(app, { registry, admission }) mountModels(app, { registry, catalog }) mountSessions(app, { sessions }) mountProfiles(app, { catalog }) - mountChatCompletions(app, { registry, sessions }) + mountChatCompletions(app, { registry, sessions, admission }) mountCadRender(app) mountImagesGenerate(app) mountMetrics(app) @@ -248,7 +250,7 @@ function constantTimeEqual(a: string, b: string): boolean { return acc === 0 } -if (import.meta.url === `file://${process.argv[1]}`) { +export async function startServer(): Promise { const config = loadConfig() const { app, sessions, extras } = await buildApp(config) const server = serve({ @@ -267,6 +269,7 @@ if (import.meta.url === `file://${process.argv[1]}`) { console.log(`[cli-bridge] listening on http://${info.address}:${info.port} (host=${config.host})`) console.log(`[cli-bridge] backends: ${[...config.backends].join(', ')}`) console.log(`[cli-bridge] bearer: ${config.bearer ? 'required' : 'none (loopback only)'}`) + console.log(`[cli-bridge] host admission: maxActive=${config.admission.maxActive} maxQueue=${config.admission.maxQueue} queueTimeoutMs=${config.admission.queueTimeoutMs}`) for (const cfg of Object.values(config.executors)) { if (cfg.kind === 'docker') { console.log(`[cli-bridge] ${cfg.name} executor: docker pool size=${cfg.poolSize} image=${cfg.image}`) @@ -324,6 +327,14 @@ if (import.meta.url === `file://${process.argv[1]}`) { }) }) process.on('uncaughtException', (err) => { + if (isFatalServerStartupError(err)) { + console.error(`[cli-bridge] fatal server error — exiting`, { + message: err.message, + name: err.name, + code: (err as NodeJS.ErrnoException).code, + }) + process.exit(1) + } console.error(`[cli-bridge] uncaughtException — keeping process alive`, { message: err.message, name: err.name, @@ -331,3 +342,13 @@ if (import.meta.url === `file://${process.argv[1]}`) { }) }) } + +if (import.meta.url === `file://${process.argv[1]}`) { + await startServer() +} + +function isFatalServerStartupError(err: Error): boolean { + const code = (err as NodeJS.ErrnoException).code + if (code === 'EADDRINUSE' || code === 'EACCES') return true + return /listen|address already in use/i.test(err.message) +} diff --git a/tests/profile-mcp.test.ts b/tests/profile-mcp.test.ts index 066df72..eb5e2d3 100644 --- a/tests/profile-mcp.test.ts +++ b/tests/profile-mcp.test.ts @@ -4,8 +4,8 @@ * * - profiles without `.mcp` produce null (no temp file written) * - explicitly disabled servers (enabled: false) are dropped - * - servers without `command` (remote http/sse) are dropped — local - * CLIs only support stdio MCP via --mcp-config + * - claude/kimi materialisation preserves stdio MCP servers and drops + * remote http/sse servers from their shared `mcp-config.json` shape * - the produced JSON matches claude/kimi's expected * `{ mcpServers: { name: { command, args, env } } }` shape * - `cleanup()` is idempotent and removes the temp dir @@ -35,10 +35,9 @@ describe('materialiseMcpConfig', () => { expect(materialiseMcpConfig({ name: 'p' } as AgentProfile)).toBeNull() }) - it('returns null when every entry is filtered out (disabled or remote)', () => { + it('returns null when every entry is filtered out', () => { const profile: AgentProfile = { mcp: { - 'remote-http': { transport: 'http', url: 'https://example.com/mcp' }, 'disabled-stdio': { command: '/usr/bin/foo', enabled: false }, }, } @@ -227,7 +226,7 @@ describe('materialiseMcpServersForClaudeKimi', () => { it('writes the canonical {mcpServers:{...}} JSON shape', () => { const m = materialiseMcpServersForClaudeKimi({ echo: { command: 'node', args: ['./echo.js'], env: { FOO: 'bar' }, timeout: 5000 }, - remote: { type: 'http', url: 'https://example.com' }, // dropped + remote: { type: 'http', url: 'https://example.com', headers: { Authorization: 'Bearer X' } }, }) expect(m).not.toBeNull() if (!m) return diff --git a/tests/scoped-host.test.ts b/tests/scoped-host.test.ts new file mode 100644 index 0000000..206d096 --- /dev/null +++ b/tests/scoped-host.test.ts @@ -0,0 +1,172 @@ +/** + * Adversarial tests for scopedHostSpawner. + * + * Each test pins a specific regression from the 2026-05-22→05-23 + * cli-bridge.service incident where LLM-invoked test fixtures leaked + * into the bridge cgroup and exhausted TasksMax (766/768), causing + * every PR-reviewer run to publish "⚠️ Review Failed". + * + * These tests run against the REAL host systemd-user-manager — no + * mocks, no stubs. Skipped automatically on machines without + * systemd-run + a user manager (Docker CI, macOS). + */ + +import { existsSync, readdirSync, readFileSync } from 'node:fs' +import { setTimeout as sleep } from 'node:timers/promises' +import { describe, expect, it } from 'vitest' +import { scopedHostSpawner } from '../src/executors/scoped-host.js' +import { killTree } from '../src/executors/process-tree.js' + +const systemdRunAvailable = + (existsSync('/usr/bin/systemd-run') || existsSync('/bin/systemd-run')) && + !!process.env.XDG_RUNTIME_DIR && + existsSync(`${process.env.XDG_RUNTIME_DIR}/systemd/private`) + +const describeReal = systemdRunAvailable ? describe : describe.skip + +/** Read /proc//cgroup → "/user.slice/.../cli-bridge-...scope" or null. */ +function cgroupOf(pid: number): string | null { + try { + const raw = readFileSync(`/proc/${pid}/cgroup`, 'utf8') + const line = raw.split('\n').find((l) => l.startsWith('0::')) + return line ? line.slice(3) : null + } catch { + return null + } +} + +/** Wait until predicate is true or `timeoutMs` elapses. */ +async function waitUntil(pred: () => boolean, timeoutMs: number, stepMs = 50): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + if (pred()) return true + await sleep(stepMs) + } + return pred() +} + +describeReal('scopedHostSpawner — real cgroup isolation', () => { + it('places the spawned process in a transient scope under cli-bridge-llm.slice', async () => { + const r = await scopedHostSpawner('/bin/sleep', ['5'], { + stdio: ['ignore', 'pipe', 'pipe'], + }) + try { + // systemd-run takes a moment to set up the scope before exec'ing + // the target. Wait until the cgroup path resolves. + const cgroup = await waitUntil( + () => { + const c = cgroupOf(r.child.pid!) + return c !== null && c.includes('cli-bridge-llm.slice') + }, + 2000, + ) + expect(cgroup, `process is not in cli-bridge-llm.slice; cgroup=${cgroupOf(r.child.pid!)}`) + .toBe(true) + } finally { + r.release() + await killTree(r.child) + } + }) + + it('reaps a SIGTERM-ignoring descendant via cgroup.kill', async () => { + // Reproduces the leak from the incident: a grandchild that + // installs `process.on('SIGTERM', () => {})` and keeps itself + // alive with a setInterval. pgid-based kill cannot reach it + // because the harness layer between it and the bridge has + // setsid'd into a new group. Only cgroup.kill works. + // + // We model the harness as `sh -c` spawning a backgrounded node + // process that intentionally: + // 1. ignores SIGTERM + // 2. starts a new session (setsid via Node `detached: true` is + // simulated here by passing the daemonised pid back via stdout) + // 3. keeps itself alive via setInterval + // The parent `sh` exits as soon as the child is spawned, so the + // grandchild is reparented to PID 1 if not contained by cgroup. + const script = ` + node -e " + process.on('SIGTERM', () => {}); + process.stdout.write(String(process.pid) + '\\n'); + setInterval(() => {}, 1000); + " & + child=$! + # Detach: close stdin/stdout/stderr of the parent shell so it + # exits, leaving the node grandchild orphaned-to-init unless + # cgroup contains it. + disown $child + # Print the grandchild pid then exit so the wrapper sees EOF. + sleep 0.5 + ` + const r = await scopedHostSpawner('/bin/sh', ['-c', script], { + stdio: ['ignore', 'pipe', 'pipe'], + }) + + let grandchildPid = 0 + r.child.stdout?.on('data', (b) => { + const m = b.toString().match(/(\d+)/) + if (m) grandchildPid = Number(m[1]) + }) + + // Wait for the grandchild to print its pid. + await waitUntil(() => grandchildPid > 0, 3000) + expect(grandchildPid, 'grandchild did not report its pid').toBeGreaterThan(0) + + // Sanity: the grandchild IS in our scope's cgroup despite being + // backgrounded and `disown`ed. + const gcCgroup = cgroupOf(grandchildPid) + expect(gcCgroup, `grandchild cgroup=${gcCgroup}`).toMatch(/cli-bridge-llm\.slice/) + + // Sanity: it really ignores SIGTERM. + try { process.kill(grandchildPid, 'SIGTERM') } catch {} + await sleep(300) + expect(() => process.kill(grandchildPid, 0), 'grandchild died to SIGTERM — fixture broken').not.toThrow() + + // Now the real test: release() should reap the entire cgroup + // via cgroup.kill, including the SIGTERM-ignoring grandchild. + r.release() + await killTree(r.child) + + const reaped = await waitUntil(() => { + try { process.kill(grandchildPid, 0); return false } catch { return true } + }, 3000) + expect(reaped, `grandchild pid=${grandchildPid} survived release()`).toBe(true) + }) + + it('release() is idempotent', async () => { + const r = await scopedHostSpawner('/bin/sleep', ['1'], { + stdio: ['ignore', 'pipe', 'pipe'], + }) + r.release() + expect(() => r.release()).not.toThrow() + await killTree(r.child) + }) + + it('does not leave scope units after the spawn completes', async () => { + const r = await scopedHostSpawner('/bin/true', [], { + stdio: ['ignore', 'pipe', 'pipe'], + }) + await new Promise((resolve) => r.child.on('exit', () => resolve())) + r.release() + + // The `--collect` flag removes the unit once empty. Give systemd + // a beat to garbage-collect, then confirm nothing under our + // slice references this PID. + await sleep(500) + const sliceCgroup = + '/sys/fs/cgroup/user.slice/user-1000.slice/user@1000.service' + + '/cli.slice/cli-bridge.slice/cli-bridge-llm.slice' + if (existsSync(sliceCgroup)) { + const remaining = readdirSync(sliceCgroup).filter((n) => n.endsWith('.scope')) + // Other tests may have concurrent scopes; we only assert OUR + // pid is gone, not that the slice is empty. + for (const scope of remaining) { + try { + const procs = readFileSync(`${sliceCgroup}/${scope}/cgroup.procs`, 'utf8').trim() + expect(procs, `our pid still in ${scope}`).not.toContain(String(r.child.pid)) + } catch { + // scope may have just been collected — race is benign + } + } + } + }) +}) diff --git a/tests/smoke.test.ts b/tests/smoke.test.ts index c299d12..3727300 100644 --- a/tests/smoke.test.ts +++ b/tests/smoke.test.ts @@ -26,6 +26,7 @@ import { mountSessions } from '../src/routes/sessions.js' import { mountHealth } from '../src/routes/health.js' import { mountModels } from '../src/routes/models.js' import { contentToText } from '../src/backends/content.js' +import { AdmissionGate } from '../src/admission.js' class FakeBackend implements Backend { constructor(readonly name: string) {} @@ -58,6 +59,31 @@ class DelayedBackend extends FakeBackend { } } +class BlockingBackend extends FakeBackend { + started = 0 + private releases: Array<() => void> = [] + + override async *chat(req: ChatRequest, session: SessionRecord | null): AsyncIterable { + this.started += 1 + await new Promise((resolve) => this.releases.push(resolve)) + yield* super.chat(req, session) + } + + releaseOne(): void { + const release = this.releases.shift() + if (release) release() + } +} + +async function waitFor(predicate: () => boolean, timeoutMs = 1000): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + if (predicate()) return + await new Promise((resolve) => setTimeout(resolve, 5)) + } + throw new Error('waitFor timed out') +} + describe('SessionStore', () => { let dir: string let store: SessionStore @@ -352,6 +378,56 @@ describe('POST /v1/chat/completions', () => { expect(res.status).toBe(404) }) + it('fails closed before spawning when host-chat admission queue is full', async () => { + const backend = new BlockingBackend('claude') + const admission = new AdmissionGate({ maxActive: 1, maxQueue: 1, queueTimeoutMs: 10_000 }) + app = new Hono() + mountChatCompletions(app, { + registry: new BackendRegistry().register(backend), + sessions, + admission, + }) + + const body = JSON.stringify({ + model: 'claude', + messages: [{ role: 'user', content: 'x' }], + stream: false, + }) + const first = app.request('/v1/chat/completions', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body, + }) + await waitFor(() => backend.started === 1) + + const second = app.request('/v1/chat/completions', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body, + }) + await waitFor(() => admission.snapshot().queued === 1) + + const third = await app.request('/v1/chat/completions', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body, + }) + expect(third.status).toBe(503) + const rejected = await third.json() as { error: { type: string; reason: string; admission: { active: number; queued: number } } } + expect(rejected.error.type).toBe('admission_rejected') + expect(rejected.error.reason).toBe('queue_full') + expect(rejected.error.admission.active).toBe(1) + expect(rejected.error.admission.queued).toBe(1) + expect(backend.started).toBe(1) + + backend.releaseOne() + expect((await first).status).toBe(200) + await waitFor(() => backend.started === 2) + backend.releaseOne() + expect((await second).status).toBe(200) + expect(admission.snapshot()).toMatchObject({ active: 0, queued: 0 }) + }) + it('returns 400 on malformed JSON', async () => { const res = await app.request('/v1/chat/completions', { method: 'POST', @@ -659,8 +735,8 @@ describe('ClaudeBackend stdin payload + buildArgs', () => { const args = b.buildArgs(baseReq, null, 'byob') expect(args).toContain('--dangerously-skip-permissions') // And must NOT carry hosted-safe's plan/disallowed-tools baggage - expect(args).not.toContain('plan') expect(args).not.toContain('--permission-mode') + expect(args).not.toContain('plan') expect(args).not.toContain('--disallowed-tools') }) diff --git a/vitest.config.ts b/vitest.config.ts new file mode 100644 index 0000000..200d167 --- /dev/null +++ b/vitest.config.ts @@ -0,0 +1,11 @@ +import { defineConfig } from 'vitest/config' + +export default defineConfig({ + test: { + exclude: [ + '**/node_modules/**', + '**/.git/**', + '**/.claude/worktrees/**', + ], + }, +})