diff --git a/pi/skills/debug-agent/debug-dashboard.ts b/pi/skills/debug-agent/debug-dashboard.ts index 683aaa8..19242cb 100644 --- a/pi/skills/debug-agent/debug-dashboard.ts +++ b/pi/skills/debug-agent/debug-dashboard.ts @@ -62,14 +62,15 @@ interface DashboardData { baudbotSha: string | null; bridgeUp: boolean; bridgeType: string | null; - sessions: { name: string; alive: boolean }[]; + bridgeUptimeMs: number | null; + sessions: { name: string; alive: boolean; uptimeMs: number | null }[]; devAgentCount: number; devAgentNames: string[]; todosInProgress: number; todosDone: number; todosTotal: number; worktreeCount: number; - uptimeMs: number; + serviceUptimeMs: number | null; lastRefresh: Date; heartbeat: HeartbeatInfo; lastEvent: LastEvent | null; @@ -142,6 +143,37 @@ function detectBridgeType(): string | null { } } +function getBridgeUptime(): number | null { + try { + const out = execSync("ps -eo etime,cmd 2>/dev/null | grep -E 'broker-bridge|bridge\\.mjs' | grep -v grep", { + encoding: "utf-8", timeout: 3000, + }).trim(); + if (!out) return null; + + // Parse etime format: [[dd-]hh:]mm:ss + const etimeStr = out.split(/\s+/)[0]; + const parts = etimeStr.split(/[-:]/); + + let seconds = 0; + if (parts.length === 4) { + // dd-hh:mm:ss + seconds = parseInt(parts[0]) * 86400 + parseInt(parts[1]) * 3600 + parseInt(parts[2]) * 60 + parseInt(parts[3]); + } else if (parts.length === 3) { + // hh:mm:ss + seconds = parseInt(parts[0]) * 3600 + parseInt(parts[1]) * 60 + parseInt(parts[2]); + } else if (parts.length === 2) { + // mm:ss + seconds = parseInt(parts[0]) * 60 + parseInt(parts[1]); + } else { + return null; + } + + return seconds * 1000; + } catch { + return null; + } +} + async function checkBridge(): Promise { try { const controller = new AbortController(); @@ -159,8 +191,32 @@ async function checkBridge(): Promise { } } -function getSessions(): { name: string; alive: boolean }[] { - const results: { name: string; alive: boolean }[] = []; +function getSessionUptime(sessionName: string): number | null { + try { + const aliasFile = join(SOCKET_DIR, `${sessionName}.alias`); + const target = readlinkSync(aliasFile); + const sessionId = basename(target, ".sock"); + + // Find session file + const subdirs = readdirSync(SESSION_DIR); + for (const subdir of subdirs) { + const dirPath = join(SESSION_DIR, subdir); + try { + const files = readdirSync(dirPath); + const match = files.find((f) => f.includes(sessionId) && f.endsWith(".jsonl")); + if (match) { + const filePath = join(dirPath, match); + const stat = statSync(filePath); + return Date.now() - stat.birthtimeMs; + } + } catch { continue; } + } + } catch {} + return null; +} + +function getSessions(): { name: string; alive: boolean; uptimeMs: number | null }[] { + const results: { name: string; alive: boolean; uptimeMs: number | null }[] = []; const expected = ["control-agent", "sentry-agent"]; try { const files = readdirSync(SOCKET_DIR); @@ -168,20 +224,22 @@ function getSessions(): { name: string; alive: boolean }[] { for (const alias of expected) { const aliasFile = `${alias}.alias`; if (!aliases.includes(aliasFile)) { - results.push({ name: alias, alive: false }); + results.push({ name: alias, alive: false, uptimeMs: null }); continue; } try { const target = readlinkSync(join(SOCKET_DIR, aliasFile)); const sockPath = join(SOCKET_DIR, target); - results.push({ name: alias, alive: existsSync(sockPath) }); + const alive = existsSync(sockPath); + const uptimeMs = alive ? getSessionUptime(alias) : null; + results.push({ name: alias, alive, uptimeMs }); } catch { - results.push({ name: alias, alive: false }); + results.push({ name: alias, alive: false, uptimeMs: null }); } } } catch { for (const alias of expected) { - results.push({ name: alias, alive: false }); + results.push({ name: alias, alive: false, uptimeMs: null }); } } return results; @@ -230,6 +288,24 @@ function getWorktreeCount(): number { } } +function getServiceUptime(): number | null { + try { + const out = execSync("systemctl show baudbot --property=ActiveEnterTimestamp --value 2>/dev/null", { + encoding: "utf-8", + timeout: 3000, + }).trim(); + + if (!out || out === "" || out === "0") return null; + + const startTime = new Date(out); + if (isNaN(startTime.getTime())) return null; + + return Date.now() - startTime.getTime(); + } catch { + return null; + } +} + function readHeartbeatState(ctx: ExtensionContext): HeartbeatInfo { const info: HeartbeatInfo = { enabled: true, lastRunAt: null, totalRuns: 0, healthy: true }; for (const entry of ctx.sessionManager.getEntries()) { @@ -533,18 +609,29 @@ function renderDashboard( } const bridgeIcon = data.bridgeUp ? theme.fg("success", "●") : theme.fg("error", "●"); - const bridgeLabel = data.bridgeUp ? "up" : theme.fg("error", "DOWN"); - const bridgeTypeStr = data.bridgeType ? dim(` ${data.bridgeType}`) : ""; + let bridgeLabel: string; + if (!data.bridgeUp) { + bridgeLabel = theme.fg("error", "bridge DOWN"); + } else if (data.bridgeType && data.bridgeUptimeMs !== null) { + bridgeLabel = `bridge ${data.bridgeType} ${dim(`(up ${formatUptime(data.bridgeUptimeMs)})`)}`; + } else if (data.bridgeType) { + bridgeLabel = `bridge ${data.bridgeType}`; + } else { + bridgeLabel = "bridge up"; + } - const row1Left = ` baudbot ${bbDisplay} ${dim("│")} pi ${piDisplay} ${dim("│")} ${bridgeIcon} bridge ${bridgeLabel}${bridgeTypeStr}`; - const row1Right = dim(`up ${formatUptime(data.uptimeMs)}`); - lines.push(pad(row1Left, row1Right, width)); + const row1Left = ` baudbot ${bbDisplay} ${dim("│")} pi ${piDisplay} ${dim("│")} ${bridgeIcon} ${bridgeLabel}`; + lines.push(pad(row1Left, "", width)); - // ── Row 2: sessions ── + // ── Row 2: sessions with uptimes ── const parts: string[] = []; for (const s of data.sessions) { const icon = s.alive ? theme.fg("success", "●") : theme.fg("error", "●"); - const label = s.alive ? dim(s.name) : theme.fg("error", s.name); + const name = s.alive ? s.name : theme.fg("error", s.name); + const uptimeStr = s.alive && s.uptimeMs !== null + ? dim(`(up ${formatUptime(s.uptimeMs)})`) + : ""; + const label = uptimeStr ? `${name} ${uptimeStr}` : name; parts.push(`${icon} ${label}`); } if (data.devAgentCount > 0) { @@ -631,9 +718,6 @@ function renderDashboard( } } - // ── Bottom border ── - lines.push(truncateToWidth(dim(bar.repeat(width)), width)); - return lines; } @@ -641,7 +725,6 @@ function renderDashboard( export default function dashboardExtension(pi: ExtensionAPI): void { let timer: ReturnType | null = null; - const startTime = Date.now(); const piVersion = getPiVersion(); let data: DashboardData | null = null; @@ -661,6 +744,8 @@ export default function dashboardExtension(pi: ExtensionAPI): void { const worktreeCount = getWorktreeCount(); const baudbot = getBaudbotVersion(); const bridgeType = detectBridgeType(); + const bridgeUptimeMs = getBridgeUptime(); + const serviceUptimeMs = getServiceUptime(); const heartbeat = savedCtx ? readHeartbeatState(savedCtx) : { enabled: true, lastRunAt: null, totalRuns: 0, healthy: true }; data = { @@ -670,6 +755,7 @@ export default function dashboardExtension(pi: ExtensionAPI): void { baudbotSha: baudbot.sha, bridgeUp, bridgeType, + bridgeUptimeMs, sessions, devAgentCount: devAgents.count, devAgentNames: devAgents.names, @@ -677,7 +763,7 @@ export default function dashboardExtension(pi: ExtensionAPI): void { todosDone: todoStats.done, todosTotal: todoStats.total, worktreeCount, - uptimeMs: Date.now() - startTime, + serviceUptimeMs, lastRefresh: new Date(), heartbeat, lastEvent, @@ -696,7 +782,6 @@ export default function dashboardExtension(pi: ExtensionAPI): void { theme.fg("dim", "─".repeat(width)), ]; } - data.uptimeMs = Date.now() - startTime; return renderDashboard(data, activityFeed.getLines(), theme, width); }, invalidate() {}, diff --git a/slack-bridge/broker-bridge.mjs b/slack-bridge/broker-bridge.mjs index 2b42e38..734f2b8 100755 --- a/slack-bridge/broker-bridge.mjs +++ b/slack-bridge/broker-bridge.mjs @@ -740,11 +740,18 @@ function verifyBrokerEnvelope(message) { } function decryptEnvelope(message) { - const plaintext = sodium.crypto_box_seal_open( - fromBase64(message.encrypted), - cryptoState.serverBoxPublicKey, - cryptoState.serverBoxSecretKey, - ); + let plaintext; + try { + plaintext = sodium.crypto_box_seal_open( + fromBase64(message.encrypted), + cryptoState.serverBoxPublicKey, + cryptoState.serverBoxSecretKey, + ); + } catch (err) { + // Wrap libsodium errors (e.g., "incorrect key pair for the given ciphertext") + // into a format that isPoisonMessageError() can detect + throw new Error(`failed to decrypt broker envelope (message_id: ${message.message_id || "unknown"})`); + } if (!plaintext) { throw new Error(`failed to decrypt broker envelope (message_id: ${message.message_id || "unknown"})`); } diff --git a/test/broker-bridge.integration.test.mjs b/test/broker-bridge.integration.test.mjs index ca808c3..07b36db 100644 --- a/test/broker-bridge.integration.test.mjs +++ b/test/broker-bridge.integration.test.mjs @@ -308,6 +308,139 @@ describe("broker pull bridge semi-integration", () => { expect(valid).toBe(true); }); + it("acks poison messages with decryption failures (wrong keys)", async () => { + await sodium.ready; + + let pullCount = 0; + let ackPayload = null; + + // Generate valid encryption with mismatched keys to trigger "incorrect key pair" + const wrongBoxKeypair = sodium.crypto_box_keypair(); + const serverBoxKeypair = sodium.crypto_box_seed_keypair(new Uint8Array(Buffer.alloc(32, 11))); + const brokerSignKeypair = sodium.crypto_sign_seed_keypair(new Uint8Array(Buffer.alloc(32, 15))); + + const payload = JSON.stringify({ source: "slack", type: "message", payload: { text: "test" }, broker_timestamp: 123 }); + // Encrypt with WRONG key (wrongBoxKeypair) but bridge will try to decrypt with serverBoxKeypair + const encrypted = sodium.crypto_box_seal(new TextEncoder().encode(payload), wrongBoxKeypair.publicKey); + const encryptedB64 = toBase64(encrypted); + + const broker = createServer(async (req, res) => { + if (req.method === "POST" && req.url === "/api/inbox/pull") { + pullCount += 1; + const brokerTimestamp = Math.floor(Date.now() / 1000); + + const canonical = canonicalizeEnvelope("T123BROKER", brokerTimestamp, encryptedB64); + const signature = sodium.crypto_sign_detached(canonical, brokerSignKeypair.privateKey); + + const messages = pullCount === 1 + ? [{ + message_id: "m-decrypt-fail-1", + workspace_id: "T123BROKER", + encrypted: encryptedB64, + broker_timestamp: brokerTimestamp, + broker_signature: toBase64(signature), + }] + : []; + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, messages })); + return; + } + + if (req.method === "POST" && req.url === "/api/inbox/ack") { + let raw = ""; + for await (const chunk of req) raw += chunk; + ackPayload = JSON.parse(raw); + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, acked: ackPayload.message_ids?.length ?? 0 })); + return; + } + + if (req.method === "POST" && req.url === "/api/send") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: true, ts: "1234.5678" })); + return; + } + + res.writeHead(404, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ ok: false, error: "not found" })); + }); + + await new Promise((resolve) => broker.listen(0, "127.0.0.1", resolve)); + servers.push(broker); + + const address = broker.address(); + if (!address || typeof address === "string") { + throw new Error("failed to get broker test server address"); + } + const brokerUrl = `http://127.0.0.1:${address.port}`; + + const testFileDir = path.dirname(fileURLToPath(import.meta.url)); + const repoRoot = path.dirname(testFileDir); + const bridgePath = path.join(repoRoot, "slack-bridge", "broker-bridge.mjs"); + const bridgeCwd = path.join(repoRoot, "slack-bridge"); + + let bridgeStdout = ""; + let bridgeStderr = ""; + let bridgeExit = null; + + const bridge = spawn("node", [bridgePath], { + cwd: bridgeCwd, + env: { + ...cleanEnv(), + SLACK_BROKER_URL: brokerUrl, + SLACK_BROKER_WORKSPACE_ID: "T123BROKER", + SLACK_BROKER_SERVER_PRIVATE_KEY: toBase64(serverBoxKeypair.privateKey), + SLACK_BROKER_SERVER_PUBLIC_KEY: toBase64(serverBoxKeypair.publicKey), + SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY: b64(32, 13), + SLACK_BROKER_PUBLIC_KEY: b64(32, 14), + SLACK_BROKER_SIGNING_PUBLIC_KEY: toBase64(brokerSignKeypair.publicKey), + SLACK_BROKER_ACCESS_TOKEN: "test-broker-token", + SLACK_ALLOWED_USERS: "U_ALLOWED", + SLACK_BROKER_POLL_INTERVAL_MS: "50", + BRIDGE_API_PORT: "0", + }, + stdio: ["ignore", "pipe", "pipe"], + }); + + bridge.stdout.on("data", (chunk) => { + bridgeStdout += chunk.toString(); + }); + bridge.stderr.on("data", (chunk) => { + bridgeStderr += chunk.toString(); + }); + const bridgeExited = new Promise((_, reject) => { + bridge.on("error", (err) => { + if (ackPayload !== null) return; + reject(new Error(`bridge spawn error: ${err.message}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`)); + }); + bridge.on("exit", (code, signal) => { + bridgeExit = { code, signal }; + if (ackPayload !== null) return; + reject(new Error(`bridge exited early: code=${code} signal=${signal}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`)); + }); + }); + + children.push(bridge); + + const ackWait = waitFor( + () => ackPayload !== null, + 10_000, + 50, + `timeout waiting for ack; pullCount=${pullCount}; exit=${JSON.stringify(bridgeExit)}; stdout=${bridgeStdout}; stderr=${bridgeStderr}`, + ); + + await Promise.race([ackWait, bridgeExited]); + + expect(ackPayload.workspace_id).toBe("T123BROKER"); + expect(ackPayload.protocol_version).toBe("2026-02-1"); + expect(ackPayload.message_ids).toContain("m-decrypt-fail-1"); + + // Verify that the bridge logged a decryption error before acking + expect(bridgeStderr).toContain("failed to decrypt"); + }); + it("forwards user messages to agent in fire-and-forget mode without get_message/turn_end RPCs", async () => { await sodium.ready;