Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/desktop/src/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ export function registerOpenAliceIpc(opts: OpenAliceIpcOptions): void {
cols: typeof body['cols'] === 'number' ? body['cols'] : 80,
rows: typeof body['rows'] === 'number' ? body['rows'] : 24,
since: typeof body['since'] === 'number' ? body['since'] : undefined,
controllerId: typeof body['controllerId'] === 'string' ? body['controllerId'] : undefined,
controllerKind: typeof body['controllerKind'] === 'string' ? body['controllerKind'] : 'electron',
takeover: body['takeover'] === true,
})
})

Expand Down
10 changes: 9 additions & 1 deletion apps/desktop/src/preload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ const api = {
ipcRenderer.invoke('openalice:workspace:read-file', input),
},
pty: {
connect: (input: { sessionId: string; cols: number; rows: number; since?: number }) => {
connect: (input: {
sessionId: string
cols: number
rows: number
since?: number
controllerId?: string
controllerKind?: string
takeover?: boolean
}) => {
const connectionId = randomId()
listenersFor(connectionId)
// Keep the Electron transport on ordinary ipcRenderer events instead of
Expand Down
10 changes: 10 additions & 0 deletions scripts/guardian/dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ async function main(): Promise<void> {
prefixLogs: true,
})

const aliceReady = await waitForHttp(`http://127.0.0.1:${ports.webPort}/api/version`, { timeoutMs: 20_000 })
if (!aliceReady) {
console.error(`[guardian] Alice failed to come up within 20s — aborting before Vite starts`)
console.error(`[guardian] If another OpenAlice/Electron instance is running on the same data root, stop it or run dev with an isolated OPENALICE_HOME/AQ_LAUNCHER_ROOT.`)
try { alice.kill('SIGTERM') } catch { /* noop */ }
try { uta.process.kill('SIGTERM') } catch { /* noop */ }
process.exit(1)
}
console.log(`[guardian] Alice ready`)

// ── Vite ──────────────────────────────────────────────────
const vite: ChildProcess = spawnChild({
name: 'vite',
Expand Down
25 changes: 22 additions & 3 deletions src/webui/workspaces-ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ const MSG_SERVER = 'openalice:pty:server-message'
const MSG_SERVER_CLOSE = 'openalice:pty:server-close'

type BridgeMessage =
| { type: typeof MSG_CONNECT; connectionId: string; sessionId: string; cols: number; rows: number; since?: number }
| {
type: typeof MSG_CONNECT
connectionId: string
sessionId: string
cols: number
rows: number
since?: number
controllerId?: string
controllerKind?: string
takeover?: boolean
}
| { type: typeof MSG_CLIENT; connectionId: string; binary: boolean; data: unknown }
| { type: typeof MSG_CLIENT_CLOSE; connectionId: string }

Expand Down Expand Up @@ -106,8 +116,17 @@ export function attachWorkspacesIpc(svc: WorkspaceService): AttachedWorkspaceIpc
const cols = clamp(msg.cols, 80, 1, 1000)
const rows = clamp(msg.rows, 24, 1, 1000)
const since = typeof msg.since === 'number' && Number.isFinite(msg.since) && msg.since >= 0 ? msg.since : undefined
const ok = svc.pool.attachById(sessionId, socket as unknown as WebSocket, cols, rows, since)
if (!ok) socket.close(4404, 'session not found')
const controllerId = typeof msg.controllerId === 'string' ? msg.controllerId.slice(0, 128) : ''
const controllerKind = typeof msg.controllerKind === 'string' ? msg.controllerKind.slice(0, 32) : 'electron'
const result = svc.pool.attachById(
sessionId,
socket as unknown as WebSocket,
cols,
rows,
since,
controllerId ? { controllerId, controllerKind, takeover: msg.takeover === true } : undefined,
)
if (!result.ok && result.reason === 'missing') socket.close(4404, 'session not found')
launcherLogger.event('ipc_pty.attached', { connectionId, sessionId, cols, rows })
console.log(`ipc pty attached: session=${sessionId} connection=${connectionId} size=${cols}x${rows}`)
return
Expand Down
24 changes: 23 additions & 1 deletion src/webui/workspaces-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ export function attachWorkspacesWS(httpServer: HttpServer, svc: WorkspaceService
const rows = clampQuery(url.searchParams.get('rows'), 24, 1, 1000);
const sinceRaw = url.searchParams.get('since');
const since = sinceRaw === null ? undefined : parseSince(sinceRaw);
const controllerId = cleanToken(url.searchParams.get('client'), 128);
const controllerKind = cleanToken(url.searchParams.get('kind'), 32) ?? 'web';
const takeover = url.searchParams.get('takeover') === '1';

const sessionId = (url.searchParams.get('session') ?? '').slice(0, 64);
if (!sessionId) {
Expand All @@ -139,6 +142,9 @@ export function attachWorkspacesWS(httpServer: HttpServer, svc: WorkspaceService
cols,
rows,
since: since ?? null,
controllerId: controllerId ?? null,
controllerKind,
takeover,
origin: req.headers.origin ?? null,
// Host the browser actually connected to. Discriminates the dev
// transport: `localhost:<backendPort>` = direct (proxy bypassed),
Expand All @@ -149,7 +155,17 @@ export function attachWorkspacesWS(httpServer: HttpServer, svc: WorkspaceService
remoteAddress: req.socket.remoteAddress ?? null,
});
try {
svc.pool.attachById(sessionId, ws, cols, rows, since);
const result = svc.pool.attachById(
sessionId,
ws,
cols,
rows,
since,
controllerId ? { controllerId, controllerKind, takeover } : undefined,
);
if (!result.ok && result.reason === 'missing') {
try { ws.close(4404, 'session not found'); } catch { /* ignore */ }
}
} catch (err) {
launcherLogger.error('pool.attach_failed', { sessionId, err });
try { ws.close(1011, 'attach failed'); } catch { /* ignore */ }
Expand Down Expand Up @@ -213,3 +229,9 @@ function parseSince(raw: string): number | undefined {
if (!Number.isFinite(n) || n < 0) return undefined;
return n;
}

function cleanToken(raw: string | null, max: number): string | undefined {
if (raw === null) return undefined;
const value = raw.trim().slice(0, max);
return value.length > 0 ? value : undefined;
}
63 changes: 63 additions & 0 deletions src/workspaces/persistent-session.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,66 @@ describe('PersistentSession backpressure / socket-drop deadlock', () => {
session.dispose('test');
});
});

describe('PersistentSession controller lease', () => {
let term: ReturnType<typeof makeFakeTerm>;

beforeEach(() => {
term = makeFakeTerm();
mockSpawn.mockReturnValue(term as unknown as pty.IPty);
});

afterEach(() => {
vi.clearAllMocks();
});

it('rejects a second controller without kicking the current controller', () => {
const session = new PersistentSession(makeOptions());
const ws1 = new FakeWs();
const ws2 = new FakeWs();

const first = session.attach(ws1 as never, 80, 24, undefined, {
controllerId: 'web:tab-a',
controllerKind: 'web',
});
expect(first.ok).toBe(true);

const second = session.attach(ws2 as never, 80, 24, undefined, {
controllerId: 'web:tab-b',
controllerKind: 'web',
});

expect(second).toEqual({
ok: false,
reason: 'locked',
owner: { id: 'web:tab-a', kind: 'web' },
});
expect(ws1.close).not.toHaveBeenCalled();
expect(ws2.close).toHaveBeenCalledWith(4409, 'session locked by another controller');

session.dispose('test');
});

it('allows an explicit takeover and kicks the previous controller', () => {
const session = new PersistentSession(makeOptions());
const ws1 = new FakeWs();
const ws2 = new FakeWs();

expect(session.attach(ws1 as never, 80, 24, undefined, {
controllerId: 'web:tab-a',
controllerKind: 'web',
}).ok).toBe(true);

const takeover = session.attach(ws2 as never, 80, 24, undefined, {
controllerId: 'telegram:chat-1',
controllerKind: 'telegram',
takeover: true,
});

expect(takeover.ok).toBe(true);
expect(ws1.close).toHaveBeenCalledWith(4001, 'kicked by new attach');
expect(ws2.close).not.toHaveBeenCalled();

session.dispose('test');
});
});
58 changes: 55 additions & 3 deletions src/workspaces/persistent-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ export interface PersistentSessionOptions {
readonly initialReplayBytes?: Buffer;
}

export interface SessionControllerClaim {
readonly controllerId: string;
readonly controllerKind: string;
/** Explicit user/operator action: replace the current controller. */
readonly takeover?: boolean;
}

export interface SessionControllerOwner {
readonly id: string;
readonly kind: string;
}

export type SessionAttachResult =
| { readonly ok: true }
| { readonly ok: false; readonly reason: 'locked'; readonly owner: SessionControllerOwner };

const MAX_DIM = 1000;
const CURSOR_TICK_MS = 2000;
const CURSOR_BYTES_INTERVAL = 64 * 1024;
Expand Down Expand Up @@ -71,6 +87,7 @@ export class PersistentSession {
private messageHandler: ((raw: unknown, isBinary: boolean) => void) | null = null;
private closeHandler: (() => void) | null = null;
private errorHandler: (() => void) | null = null;
private controller: SessionControllerOwner | null = null;
private currentCols: number;
private currentRows: number;
private respawnTimes: number[] = [];
Expand Down Expand Up @@ -281,15 +298,40 @@ export class PersistentSession {
this.log.info('session.agent_id_detected', { agentSessionId: id });
}

/** Swap in `ws` as the attached client; kick the previous one if any. */
attach(ws: WebSocket, cols: number, rows: number, since: number | undefined): void {
/** Swap in `ws` as the attached client. A controller claim makes ownership
* explicit: a second controller is rejected unless it deliberately takes
* over. This is the shared rule for browser, Electron IPC, future IM bridges,
* and any debug client that can write to the PTY. */
attach(
ws: WebSocket,
cols: number,
rows: number,
since: number | undefined,
claim?: SessionControllerClaim,
): SessionAttachResult {
if (this.disposed) {
try {
ws.close(1011, 'session disposed');
} catch {
// ignore
}
return;
return { ok: false, reason: 'locked', owner: this.controller ?? { id: 'disposed', kind: 'session' } };
}

const owner = normalizeClaim(claim);
if (
owner &&
this.ws !== null &&
this.controller !== null &&
this.controller.id !== owner.id &&
!claim?.takeover
) {
try {
ws.close(4409, 'session locked by another controller');
} catch {
// ignore
}
return { ok: false, reason: 'locked', owner: this.controller };
}

// Kick previous client.
Expand All @@ -305,6 +347,7 @@ export class PersistentSession {
}

this.ws = ws;
this.controller = owner;
// A previous client may have dropped mid-backpressure, leaving the PTY
// paused at the OS level. Clearing only the flag (not resuming the term)
// would strand it paused forever; resumePty() un-sticks both.
Expand Down Expand Up @@ -345,14 +388,17 @@ export class PersistentSession {
replayFromSeq: slice.effectiveSeq,
replayBytes: slice.bytes.length,
scrollbackTruncated,
controller: this.controller,
});
return { ok: true };
}

/** Drop the current client without killing the PTY. */
detach(): void {
if (this.ws === null) return;
const ws = this.ws;
this.ws = null;
this.controller = null;
this.unwireWs(ws);
// No consumer left — let the PTY run free into the in-memory ring buffer
// (onPtyData appends regardless of socket). If the socket dropped while
Expand Down Expand Up @@ -387,6 +433,7 @@ export class PersistentSession {
if (ws !== null) {
this.unwireWs(ws);
this.ws = null;
this.controller = null;
try {
ws.close(1000, `disposed: ${reason}`);
} catch {
Expand Down Expand Up @@ -543,6 +590,11 @@ export class PersistentSession {
}
}

function normalizeClaim(claim: SessionControllerClaim | undefined): SessionControllerOwner | null {
if (!claim?.controllerId) return null;
return { id: claim.controllerId, kind: claim.controllerKind || 'unknown' };
}

function toBuffer(raw: unknown): Buffer | null {
if (Buffer.isBuffer(raw)) return raw;
if (raw instanceof ArrayBuffer) return Buffer.from(raw);
Expand Down
66 changes: 66 additions & 0 deletions src/workspaces/process-lock.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { mkdtemp, readFile, rm } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'

import { afterEach, beforeEach, describe, expect, it } from 'vitest'

import {
WorkspaceRootLockedError,
acquireWorkspaceProcessLock,
} from './process-lock.js'

let root: string

beforeEach(async () => {
root = await mkdtemp(join(tmpdir(), 'oa-root-lock-'))
})

afterEach(async () => {
await rm(root, { recursive: true, force: true })
})

describe('workspace process root lock', () => {
it('prevents two Alice backends from owning the same launcher root', async () => {
const first = await acquireWorkspaceProcessLock(root, {
owner: { pid: 111, hostname: 'host-a' },
isProcessAlive: () => true,
})

await expect(
acquireWorkspaceProcessLock(root, {
owner: { pid: 222, hostname: 'host-b' },
isProcessAlive: () => true,
}),
).rejects.toBeInstanceOf(WorkspaceRootLockedError)

await first.release()
const second = await acquireWorkspaceProcessLock(root, {
owner: { pid: 222, hostname: 'host-b' },
isProcessAlive: () => true,
})
await second.release()
})

it('reclaims a stale lock when the recorded owner process is gone', async () => {
const stale = await acquireWorkspaceProcessLock(root, {
owner: { pid: 111, hostname: 'host-a' },
isProcessAlive: () => true,
})

const fresh = await acquireWorkspaceProcessLock(root, {
owner: { pid: 222, hostname: 'host-b' },
isProcessAlive: (pid) => pid !== 111,
})

const owner = JSON.parse(await readFile(join(root, 'state', 'runtime.lock', 'owner.json'), 'utf8')) as {
pid: number
hostname: string
}
expect(owner.pid).toBe(222)
expect(owner.hostname).toBe('host-b')

// The stale handle must not remove the fresh owner when it later releases.
await stale.release()
await fresh.release()
})
})
Loading
Loading