Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions scripts/service-entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env tsx
/**
* systemd entrypoint for cli-bridge.
*
* Keep the service command line distinct from application dev servers such as
* `tsx src/server.ts`. Some ADC cleanup paths use broad process patterns for
* dev servers; running the bridge through this wrapper keeps the long-lived
* shared bridge out of that blast radius while preserving the normal server
* startup path.
*/

import { startServer } from '../src/server.js'

await startServer()
139 changes: 139 additions & 0 deletions src/admission.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
export interface AdmissionSnapshot {
active: number
queued: number
maxActive: number
maxQueue: number
}

export interface AdmissionLease {
release(): void
}

export class AdmissionRejectedError extends Error {
constructor(
message: string,
public readonly reason: 'queue_full' | 'queue_timeout' | 'aborted',
public readonly snapshot: AdmissionSnapshot,
) {
super(message)
this.name = 'AdmissionRejectedError'
}
}

interface Waiter {
resolve: (lease: AdmissionLease) => void
reject: (err: AdmissionRejectedError) => void
signal?: AbortSignal
timer?: ReturnType<typeof setTimeout>
onAbort?: () => void
}

export interface AdmissionGateOptions {
maxActive: number
maxQueue: number
queueTimeoutMs: number
}

export class AdmissionGate {
private active = 0
private readonly waiters: Waiter[] = []

constructor(private readonly opts: AdmissionGateOptions) {
if (!Number.isInteger(opts.maxActive) || opts.maxActive < 1) {
throw new Error(`invalid maxActive: ${opts.maxActive}`)
}
if (!Number.isInteger(opts.maxQueue) || opts.maxQueue < 0) {
throw new Error(`invalid maxQueue: ${opts.maxQueue}`)
}
if (!Number.isInteger(opts.queueTimeoutMs) || opts.queueTimeoutMs < 0) {
throw new Error(`invalid queueTimeoutMs: ${opts.queueTimeoutMs}`)
}
}

snapshot(): AdmissionSnapshot {
return {
active: this.active,
queued: this.waiters.length,
maxActive: this.opts.maxActive,
maxQueue: this.opts.maxQueue,
}
}

acquire(signal?: AbortSignal): Promise<AdmissionLease> {
if (signal?.aborted) {
return Promise.reject(this.rejected('admission aborted before queueing', 'aborted'))
}

if (this.active < this.opts.maxActive) {
this.active += 1
return Promise.resolve(this.makeLease())
}

if (this.waiters.length >= this.opts.maxQueue) {
return Promise.reject(this.rejected('cli-bridge is saturated: admission queue is full', 'queue_full'))
}

return new Promise<AdmissionLease>((resolve, reject) => {
const waiter: Waiter = { resolve, reject, signal }
waiter.onAbort = () => {
this.removeWaiter(waiter)
reject(this.rejected('cli-bridge admission aborted while queued', 'aborted'))
}
if (signal) {
signal.addEventListener('abort', waiter.onAbort, { once: true })
}
if (this.opts.queueTimeoutMs > 0) {
waiter.timer = setTimeout(() => {
this.removeWaiter(waiter)
reject(this.rejected(`cli-bridge admission timed out after ${this.opts.queueTimeoutMs}ms`, 'queue_timeout'))
}, this.opts.queueTimeoutMs)
waiter.timer.unref?.()
}
this.waiters.push(waiter)
})
}

private makeLease(): AdmissionLease {
let released = false
return {
release: () => {
if (released) return
released = true
this.release()
},
}
}

private release(): void {
if (this.active > 0) this.active -= 1
while (this.waiters.length > 0 && this.active < this.opts.maxActive) {
const next = this.waiters.shift()
if (!next) return
this.cleanup(next)
if (next.signal?.aborted) {
next.reject(this.rejected('cli-bridge admission aborted while queued', 'aborted'))
continue
}
this.active += 1
next.resolve(this.makeLease())
return
}
}

private removeWaiter(waiter: Waiter): void {
const idx = this.waiters.indexOf(waiter)
if (idx !== -1) this.waiters.splice(idx, 1)
this.cleanup(waiter)
}

private cleanup(waiter: Waiter): void {
if (waiter.timer) clearTimeout(waiter.timer)
if (waiter.signal && waiter.onAbort) {
waiter.signal.removeEventListener('abort', waiter.onAbort)
}
}

private rejected(message: string, reason: AdmissionRejectedError['reason']): AdmissionRejectedError {
return new AdmissionRejectedError(message, reason, this.snapshot())
}
}
4 changes: 2 additions & 2 deletions src/backends/claude.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
type MaterialisedMcpConfig,
} from './profile-support.js'
import { contentToText } from './content.js'
import { hostSpawner } from '../executors/host.js'
import { scopedHostSpawner } from '../executors/scoped-host.js'
import type { Spawner } from '../executors/types.js'
import { readProcessLines, waitForProcessClose } from './process-lines.js'
import { writeStdinPayload } from './stdin-payload.js'
Expand Down Expand Up @@ -103,7 +103,7 @@ export class ClaudeBackend implements Backend {
this.timeoutMs = opts.timeoutMs
this.anthropicBaseUrl = opts.anthropicBaseUrl ?? null
this.prefix = `${this.name}/`
this.spawner = opts.spawner ?? hostSpawner
this.spawner = opts.spawner ?? scopedHostSpawner
}

matches(model: string): boolean {
Expand Down
4 changes: 2 additions & 2 deletions src/backends/codex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
resolvePromptMessages,
} from './profile-support.js'
import { contentToText } from './content.js'
import { hostSpawner } from '../executors/host.js'
import { scopedHostSpawner } from '../executors/scoped-host.js'
import type { Spawner } from '../executors/types.js'
import { readProcessLines, waitForProcessClose } from './process-lines.js'
import { killTree } from '../executors/process-tree.js'
Expand All @@ -52,7 +52,7 @@ export class CodexBackend implements Backend {
readonly name = 'codex'
private readonly spawner: Spawner
constructor(private readonly opts: CodexBackendOptions) {
this.spawner = opts.spawner ?? hostSpawner
this.spawner = opts.spawner ?? scopedHostSpawner
}

matches(model: string): boolean {
Expand Down
4 changes: 2 additions & 2 deletions src/backends/kimi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import {
resolvePromptMessages,
} from './profile-support.js'
import { contentToText } from './content.js'
import { hostSpawner } from '../executors/host.js'
import { scopedHostSpawner } from '../executors/scoped-host.js'
import type { Spawner } from '../executors/types.js'
import { readProcessLines, waitForProcessClose } from './process-lines.js'
import { writeStdinPayload } from './stdin-payload.js'
Expand All @@ -67,7 +67,7 @@ export class KimiBackend implements Backend {
constructor(private readonly opts: KimiBackendOptions) {
this.name = opts.harness ?? 'kimi-code'
this.prefix = `${this.name}/`
this.spawner = opts.spawner ?? hostSpawner
this.spawner = opts.spawner ?? scopedHostSpawner
}

matches(model: string): boolean {
Expand Down
4 changes: 2 additions & 2 deletions src/backends/opencode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { assertModeSupported } from '../modes.js'
import type { SessionRecord } from '../sessions/store.js'
import { materialiseMcpServersForOpencode, resolveMcpServers, resolvePromptMessages } from './profile-support.js'
import { contentToText } from './content.js'
import { hostSpawner } from '../executors/host.js'
import { scopedHostSpawner } from '../executors/scoped-host.js'
import type { Spawner } from '../executors/types.js'
import { readProcessLines, waitForProcessClose } from './process-lines.js'
import { writeStdinPayload } from './stdin-payload.js'
Expand All @@ -40,7 +40,7 @@ export class OpencodeBackend implements Backend {
readonly name = 'opencode'
private readonly spawner: Spawner
constructor(private readonly opts: OpencodeBackendOptions) {
this.spawner = opts.spawner ?? hostSpawner
this.spawner = opts.spawner ?? scopedHostSpawner
}

matches(model: string): boolean {
Expand Down
28 changes: 28 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ export interface Config {
piBin: string
piTimeoutMs: number
cliTimeoutMsDefault: number
admission: {
maxActive: number
maxQueue: number
queueTimeoutMs: number
}
/**
* When set, the `claudish` harness is registered and Claude Code is
* spawned with ANTHROPIC_BASE_URL=<this> for `claudish/*` model ids.
Expand Down Expand Up @@ -139,6 +144,11 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Config {
piBin: env.PI_BIN ?? 'pi',
piTimeoutMs: Number.parseInt(env.PI_TIMEOUT_MS ?? String(defaultTimeout), 10),
cliTimeoutMsDefault: defaultTimeout,
admission: {
maxActive: parsePositiveInt(env.BRIDGE_HOST_CHAT_MAX_ACTIVE, 24),
maxQueue: parseNonNegativeInt(env.BRIDGE_HOST_CHAT_MAX_QUEUE, 64),
queueTimeoutMs: parseNonNegativeInt(env.BRIDGE_HOST_CHAT_QUEUE_TIMEOUT_MS, 30_000),
},
claudishUrl: env.CLAUDISH_URL?.trim() || null,
openaiApiKey: env.OPENAI_API_KEY?.trim() || null,
anthropicApiKey: env.ANTHROPIC_API_KEY?.trim() || null,
Expand All @@ -152,6 +162,24 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): Config {
}
}

function parsePositiveInt(value: string | undefined, fallback: number): number {
if (value === undefined || value === '') return fallback
const parsed = Number.parseInt(value, 10)
if (!Number.isInteger(parsed) || parsed < 1) {
throw new Error(`invalid positive integer: ${value}`)
}
return parsed
}

function parseNonNegativeInt(value: string | undefined, fallback: number): number {
if (value === undefined || value === '') return fallback
const parsed = Number.parseInt(value, 10)
if (!Number.isInteger(parsed) || parsed < 0) {
throw new Error(`invalid non-negative integer: ${value}`)
}
return parsed
}

/**
* Per-backend executor defaults. All subprocess backends share the
* same default runtime image (`cli-bridge-cli-runtime`) — that image
Expand Down
Loading