diff --git a/apps/code/src/renderer/desktop-contributions.ts b/apps/code/src/renderer/desktop-contributions.ts index 3bf9dcd7e6..4ba168b7d7 100644 --- a/apps/code/src/renderer/desktop-contributions.ts +++ b/apps/code/src/renderer/desktop-contributions.ts @@ -1,3 +1,4 @@ +import { agentChatCoreModule } from "@posthog/core/agent-chat/agentChat.module"; import { billingCoreModule } from "@posthog/core/billing/billing.module"; import { inboxCoreModule } from "@posthog/core/inbox/inbox.module"; import { githubConnectModule } from "@posthog/core/integrations/githubConnect.module"; @@ -25,6 +26,7 @@ import { container } from "@renderer/di/container"; export function registerDesktopContributions(): void { for (const module of [ + agentChatCoreModule, agentUiModule, authUiModule, billingUiModule, diff --git a/packages/api-client/src/agent-analytics.test.ts b/packages/api-client/src/agent-analytics.test.ts index e17d3b06d3..92d9ae1809 100644 --- a/packages/api-client/src/agent-analytics.test.ts +++ b/packages/api-client/src/agent-analytics.test.ts @@ -31,12 +31,15 @@ describe("buildAgentAnalyticsQueries", () => { }); it("narrows to a single application id when given", () => { - const q = buildAgentAnalyticsQueries("app-uuid-123"); - expect(q.kpi).toContain( - "properties.$agent_application_id = 'app-uuid-123'", - ); - expect(q.byModel).toContain( - "properties.$agent_application_id = 'app-uuid-123'", + const id = "11111111-2222-3333-4444-555566667777"; + const q = buildAgentAnalyticsQueries(id); + expect(q.kpi).toContain(`properties.$agent_application_id = '${id}'`); + expect(q.byModel).toContain(`properties.$agent_application_id = '${id}'`); + }); + + it("rejects a non-uuid application id", () => { + expect(() => buildAgentAnalyticsQueries("app-uuid-123")).toThrow( + /must be a UUID/, ); }); }); diff --git a/packages/api-client/src/agent-analytics.ts b/packages/api-client/src/agent-analytics.ts index 7e1afdb814..d8b4b757e5 100644 --- a/packages/api-client/src/agent-analytics.ts +++ b/packages/api-client/src/agent-analytics.ts @@ -36,11 +36,18 @@ export interface AgentAnalyticsRaw { /** Only the agents' own traffic — not the team's other LLM events. */ const AGENT_ORIGIN = "properties.$ai_origin = 'agent_platform_runner'"; +const UUID_RE = + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + /** - * Shared WHERE scope. `applicationId` (a trusted UUID from the agent record) - * narrows the board to a single agent for the per-agent Observability tab. + * Shared WHERE scope narrowing the board to a single agent. `applicationId` is + * a trusted server UUID, but reject anything non-UUID before interpolating it + * into HogQL rather than rely on that. */ function scope(applicationId?: string): string { + if (applicationId && !UUID_RE.test(applicationId)) { + throw new Error("agent analytics: applicationId must be a UUID"); + } const agent = applicationId ? ` AND properties.$agent_application_id = '${applicationId}'` : ""; diff --git a/packages/core/src/agent-chat/agentChat.module.ts b/packages/core/src/agent-chat/agentChat.module.ts new file mode 100644 index 0000000000..c02631285e --- /dev/null +++ b/packages/core/src/agent-chat/agentChat.module.ts @@ -0,0 +1,8 @@ +import { ContainerModule } from "inversify"; +import { AgentChatService } from "./agentChatService"; +import { AGENT_CHAT_SERVICE } from "./identifiers"; + +export const agentChatCoreModule = new ContainerModule(({ bind }) => { + bind(AgentChatService).toSelf().inSingletonScope(); + bind(AGENT_CHAT_SERVICE).toService(AgentChatService); +}); diff --git a/packages/ui/src/features/agent-applications/hooks/useAgentChat.test.tsx b/packages/core/src/agent-chat/agentChatService.test.ts similarity index 78% rename from packages/ui/src/features/agent-applications/hooks/useAgentChat.test.tsx rename to packages/core/src/agent-chat/agentChatService.test.ts index 8956e6b452..a40615904c 100644 --- a/packages/ui/src/features/agent-applications/hooks/useAgentChat.test.tsx +++ b/packages/core/src/agent-chat/agentChatService.test.ts @@ -1,7 +1,13 @@ -import { agentChatStore } from "@posthog/core/agent-chat/agentChatStore"; +import type { PostHogAPIClient } from "@posthog/api-client/posthog-client"; import type { AgentSessionEvent } from "@posthog/shared/agent-platform-types"; -import { act, renderHook } from "@testing-library/react"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { AgentChatService } from "./agentChatService"; +import { agentChatStore } from "./agentChatStore"; +import type { AgentChatSession } from "./identifiers"; + +const INGRESS = "https://ingress.example"; +const SLUG = "my-agent"; +const SESSION = "sess-1"; const mockClient = vi.hoisted(() => ({ runAgentSession: vi.fn(), @@ -13,18 +19,24 @@ const mockClient = vi.hoisted(() => ({ sendAgentInteractiveToolResult: vi.fn(), mintAgentPreviewToken: vi.fn(), })); -vi.mock("@posthog/ui/features/auth/authClient", () => ({ - useAuthenticatedClient: () => mockClient, -})); -vi.mock("@posthog/ui/primitives/toast", () => ({ - toast: { error: vi.fn(), warning: vi.fn(), info: vi.fn() }, -})); +const client = mockClient as unknown as PostHogAPIClient; -import { useAgentChat } from "./useAgentChat"; - -const INGRESS = "https://ingress.example"; -const SLUG = "my-agent"; -const SESSION = "sess-1"; +function session(chatId: string): AgentChatSession { + return { + chatId, + agentSlug: SLUG, + ingressBaseUrl: INGRESS, + revisionId: null, + createMapper: () => ({ + seedUserMessage: () => [], + setPromptIdBase: () => {}, + apply: () => [], + }), + resolveClientTool: async () => null, + buildWireText: (text) => text, + mapConversation: () => [], + }; +} function ev(kind: AgentSessionEvent["kind"], data: unknown): AgentSessionEvent { return { @@ -46,16 +58,13 @@ async function* streamThenDrop(...events: AgentSessionEvent[]) { throw new Error("network reset"); } -function render(chatId: string) { - return renderHook(() => - useAgentChat({ chatId, agentSlug: SLUG, ingressBaseUrl: INGRESS }), - ); -} +describe("AgentChatService /listen reconnect", () => { + let service: AgentChatService; -describe("useAgentChat /listen reconnect", () => { beforeEach(() => { vi.clearAllMocks(); vi.useFakeTimers(); + service = new AgentChatService(); mockClient.runAgentSession.mockResolvedValue({ session_id: SESSION }); }); afterEach(() => { @@ -70,15 +79,10 @@ describe("useAgentChat /listen reconnect", () => { ) .mockImplementationOnce(() => streamOf(ev("completed", {}))); - const { result } = render(chatId); - await act(async () => { - void result.current.send("go"); - }); + void service.send(client, session(chatId), "go"); // Let start() + the first pump (delta then drop) settle, then clear the // reconnect backoff so the second attach runs. - await act(async () => { - await vi.advanceTimersByTimeAsync(1000); - }); + await vi.advanceTimersByTimeAsync(1000); expect(mockClient.streamAgentSession).toHaveBeenCalledTimes(2); // A live drop never asks the api whether the session ended — the re-attach @@ -100,13 +104,8 @@ describe("useAgentChat /listen reconnect", () => { conversation: [], }); - const { result } = render(chatId); - await act(async () => { - void result.current.send("go"); - }); - await act(async () => { - await vi.advanceTimersByTimeAsync(0); - }); + void service.send(client, session(chatId), "go"); + await vi.advanceTimersByTimeAsync(0); // Silent re-attach → we ask the api, see it's terminal, and stop. No retry, // no error. @@ -134,13 +133,8 @@ describe("useAgentChat /listen reconnect", () => { }, ); - const { result } = render(chatId); - await act(async () => { - void result.current.send("go"); - }); - await act(async () => { - await vi.advanceTimersByTimeAsync(0); - }); + void service.send(client, session(chatId), "go"); + await vi.advanceTimersByTimeAsync(0); const chat = agentChatStore.getState().chats[chatId]; expect(chat?.status).toBe("completed"); @@ -162,14 +156,9 @@ describe("useAgentChat /listen reconnect", () => { conversation: [], }); - const { result } = render(chatId); - await act(async () => { - void result.current.send("go"); - }); + void service.send(client, session(chatId), "go"); // Drain the full capped-exponential backoff schedule (≈23.5s). - await act(async () => { - await vi.advanceTimersByTimeAsync(30_000); - }); + await vi.advanceTimersByTimeAsync(30_000); // Initial attach + MAX_LISTEN_RECONNECTS (6) re-attaches. expect(mockClient.streamAgentSession).toHaveBeenCalledTimes(7); diff --git a/packages/core/src/agent-chat/agentChatService.ts b/packages/core/src/agent-chat/agentChatService.ts new file mode 100644 index 0000000000..8631cf5aa4 --- /dev/null +++ b/packages/core/src/agent-chat/agentChatService.ts @@ -0,0 +1,579 @@ +import type { PostHogAPIClient } from "@posthog/api-client/posthog-client"; +import { injectable } from "inversify"; +import { agentChatStore } from "./agentChatStore"; +import type { + AgentChatMapper, + AgentChatSession, + ClientToolCallData, +} from "./identifiers"; + +/** Session states with no further activity to tail — render stored history only. */ +const TERMINAL_SESSION_STATES = new Set([ + "completed", + "closed", + "cancelled", + "failed", +]); + +/** + * Bounded reconnect budget for a dropped `/listen` tail. A re-attach that yields + * any event resets the budget, so a healthy long run that keeps getting closed + * out (idle timeouts, proxy recycling) reconnects indefinitely; only a genuinely + * dead or vanished stream exhausts it and surfaces an error. + */ +const MAX_LISTEN_RECONNECTS = 6; + +/** Reserve a margin so we mint a fresh token before the server rejects the old one. */ +const PREVIEW_TOKEN_EARLY_REFRESH_MS = 30_000; + +/** Exponential backoff (capped at 8s) between `/listen` reconnect attempts. */ +function reconnectBackoffMs(attempt: number): number { + return Math.min(500 * 2 ** (attempt - 1), 8_000); +} + +/** Resolve after `ms`, or early (→ false) if `signal` aborts; else → true. */ +function delay(ms: number, signal: AbortSignal): Promise { + if (signal.aborted) return Promise.resolve(false); + return new Promise((resolve) => { + const onAbort = () => { + clearTimeout(timer); + resolve(false); + }; + const timer = setTimeout(() => { + signal.removeEventListener("abort", onAbort); + resolve(true); + }, ms); + signal.addEventListener("abort", onAbort, { once: true }); + }); +} + +/** + * The ingress signals an expired/missing preview token with the fetcher's + * `Failed request: [401] …` shape, same as any other auth failure. Anything + * else falls through to the caller as a normal error. + */ +function isPreviewAuthError(err: unknown): boolean { + return err instanceof Error && /\[401\]/.test(err.message); +} + +interface CachedPreviewToken { + token: string; + expiresAtMs: number; +} + +/** Per-chat saga state — one live `/listen` loop, mapper, and token cache. */ +interface ChatRuntime { + mapper: AgentChatMapper; + abort: AbortController | null; + streaming: boolean; + /** Each stream attach bumps this; a superseded loop checks it before touching the store. */ + epoch: number; + previewToken: CachedPreviewToken | null; + revisionId: string | null; +} + +/** + * Drives live chats against deployed agents' ingress: starts/sends/cancels via + * the api-client, streams SSE through the host's mapper, and pumps the resulting + * ACP messages into the core `agentChatStore` keyed by `chatId` (so the agent + * builder dock and per-agent previews coexist). One `ChatRuntime` per chat holds + * the reconnect loop, epoch supersede, and preview-token cache. + * + * The renderer hook supplies the authenticated client per call and the UI seams + * (mapper, client-tool resolution, context envelope, history) via the session. + */ +@injectable() +export class AgentChatService { + private readonly runtimes = new Map(); + + /** Ensure a runtime exists, dropping a cached token when the revision changes. */ + private runtime(session: AgentChatSession): ChatRuntime { + let rt = this.runtimes.get(session.chatId); + if (!rt) { + rt = { + mapper: session.createMapper(), + abort: null, + streaming: false, + epoch: 0, + previewToken: null, + revisionId: session.revisionId, + }; + this.runtimes.set(session.chatId, rt); + } + // A token is bound to a specific (app, revision); a stale one wouldn't route + // to the new target when the consumer flips revisions (incl. live ↔ draft). + if (rt.revisionId !== session.revisionId) { + rt.revisionId = session.revisionId; + rt.previewToken = null; + } + return rt; + } + + /** + * Mint a preview token if we don't have one, or refresh it just before expiry. + * `force` skips the cache (post-401 retry). Returns null for live revisions. + */ + private async getPreviewToken( + client: PostHogAPIClient, + rt: ChatRuntime, + session: AgentChatSession, + force = false, + ): Promise { + if (!session.revisionId) return null; + const cached = rt.previewToken; + if ( + !force && + cached && + cached.expiresAtMs - Date.now() > PREVIEW_TOKEN_EARLY_REFRESH_MS + ) { + return cached.token; + } + const minted = await client.mintAgentPreviewToken( + session.agentSlug, + session.revisionId, + ); + rt.previewToken = { + token: minted.token, + // Backend returns TTL in seconds; store an absolute deadline so the + // early-refresh comparison is straight subtraction. + expiresAtMs: Date.now() + minted.expires_in * 1000, + }; + return minted.token; + } + + /** + * Run an ingress call with the cached preview token; on the fetcher's `[401]`, + * mint fresh and retry once. For live revisions this is just `call(null)`. + */ + private async withPreviewToken( + client: PostHogAPIClient, + rt: ChatRuntime, + session: AgentChatSession, + call: (token: string | null) => Promise, + ): Promise { + const token = await this.getPreviewToken(client, rt, session); + try { + return await call(token); + } catch (err) { + if (!session.revisionId || !isPreviewAuthError(err)) throw err; + const fresh = await this.getPreviewToken(client, rt, session, true); + return call(fresh); + } + } + + private async dispatchClientTool( + client: PostHogAPIClient, + rt: ChatRuntime, + session: AgentChatSession, + data: ClientToolCallData, + sessionId: string, + ): Promise { + const outcome = await session.resolveClientTool(data); + // Interactive tools (set_secret) post their own outcome later. + if (!outcome || outcome.defer) return; + try { + await this.withPreviewToken(client, rt, session, (token) => + client.sendAgentClientToolResult( + session.ingressBaseUrl, + sessionId, + data.call_id, + outcome, + token, + ), + ); + } catch { + // Best-effort — the session will time the call out if this fails. + } + } + + private async runStream( + client: PostHogAPIClient, + session: AgentChatSession, + sessionId: string, + ): Promise { + const { chatId } = session; + const rt = this.runtime(session); + // Supersede any in-flight stream (resume / new chat) and claim this epoch. + rt.abort?.abort(); + const epoch = ++rt.epoch; + const controller = new AbortController(); + rt.abort = controller; + rt.streaming = true; + const store = agentChatStore.getState(); + // True the moment a (re)attached stream yields a real event, so the + // reconnect loop can tell "still producing output" from "attached to a + // silent or ended session". Reset before every pump attempt. + let madeProgress = false; + // Last non-auth stream error, surfaced only if reconnects are exhausted. + let lastDropError: string | null = null; + // Pump the SSE generator with the supplied token. Returns: + // "remint" — server signalled `preview_token_required`; mint + reconnect. + // "auth_failure" — initial fetch 401'd; safety-net retry once. + // "done" — the stream ended (terminal frame, drop, or supersede). + const pump = async ( + token: string | null, + ): Promise<"remint" | "auth_failure" | "done"> => { + try { + for await (const event of client.streamAgentSession( + session.ingressBaseUrl, + sessionId, + controller.signal, + token, + )) { + if (rt.epoch !== epoch) return "done"; + // Control event: don't surface to the user, just request a remint. + if (event.kind === "preview_token_required") return "remint"; + // Hard end (meta-end-session): the session is sealed and rejects + // further `/send`s. Unlike `completed` (turn-end, stays open), this is + // terminal — finalize and stop tailing. The mapper renders nothing for + // it, so skip the append like the remint. + if (event.kind === "closed") { + store.setStatus(chatId, "completed"); + return "done"; + } + madeProgress = true; + store.appendMessages(chatId, rt.mapper.apply(event)); + if (event.kind === "client_tool_call") { + void this.dispatchClientTool( + client, + rt, + session, + event.data, + sessionId, + ); + } else if (event.kind === "completed") { + store.setStatus(chatId, "completed"); + } else if (event.kind === "waiting") { + store.setStatus(chatId, "awaiting_input"); + } else if (event.kind === "failed") { + store.setStatus(chatId, "failed"); + store.setError( + chatId, + event.data?.reason ?? "The agent run failed.", + ); + } + } + return "done"; + } catch (err) { + if ( + session.revisionId && + !controller.signal.aborted && + isPreviewAuthError(err) + ) { + return "auth_failure"; + } + // Network reset / idle-timeout close / parse failure: remember it but + // don't surface yet — the loop reconnects, and only errors if the + // session is gone or the reconnect budget is exhausted. + if (!controller.signal.aborted) { + lastDropError = err instanceof Error ? err.message : null; + } + return "done"; + } + }; + // Is the run still live? `/listen` can't replay a terminal frame missed + // during a gap, so on a silent re-attach we ask the api before retrying. + const sessionLiveState = async (): Promise< + "live" | "terminal" | "unknown" + > => { + try { + const detail = await client.getAgentApplicationSession( + session.agentSlug, + sessionId, + ); + return !detail || TERMINAL_SESSION_STATES.has(detail.state) + ? "terminal" + : "live"; + } catch { + return "unknown"; + } + }; + try { + let token = await this.getPreviewToken(client, rt, session); + // `preview_token_required` is unbounded (one re-mint per ~15 min TTL); a + // true `[401]` only gets one retry as a safety net for the initial fetch. + let authRetried = false; + let reconnectAttempts = 0; + while (true) { + madeProgress = false; + const outcome = await pump(token); + if (rt.epoch !== epoch || controller.signal.aborted) break; + if (outcome === "remint") { + token = await this.getPreviewToken(client, rt, session, true); + continue; + } + if (outcome === "auth_failure" && !authRetried) { + authRetried = true; + token = await this.getPreviewToken(client, rt, session, true); + continue; + } + if (outcome === "auth_failure") { + store.setError( + chatId, + "Preview session failed to authenticate. Try again.", + ); + break; + } + // outcome === "done": a terminal/`waiting` frame already moved us off + // "streaming" — that's an expected end, so stop. + if (agentChatStore.getState().chats[chatId]?.status !== "streaming") { + break; + } + // Still "streaming" → the connection dropped while the run is live. + if (madeProgress) { + // The re-attach produced output: reset the budget so repeated idle + // drops never exhaust it. + reconnectAttempts = 0; + } else { + // Silence on (re)attach: confirm the run didn't just finish in the gap + // before spending the budget. + const liveState = await sessionLiveState(); + if (rt.epoch !== epoch || controller.signal.aborted) break; + if (liveState === "terminal") { + store.setStatus(chatId, "completed"); + break; + } + } + if (reconnectAttempts >= MAX_LISTEN_RECONNECTS) { + store.setError( + chatId, + lastDropError ?? + "Lost connection to the agent. Send a message to retry.", + ); + break; + } + reconnectAttempts += 1; + const waited = await delay( + reconnectBackoffMs(reconnectAttempts), + controller.signal, + ); + if (!waited || rt.epoch !== epoch || controller.signal.aborted) break; + // Refresh a preview token that may have lapsed across the gap. + token = await this.getPreviewToken(client, rt, session); + } + } catch (err) { + // A `getPreviewToken` throw (mint or re-mint) lands here — `pump` handles + // its own errors. Without this the rejection would slip past `finally` + // (which only flips status) and the stream would quietly stop. + if (rt.epoch === epoch && !controller.signal.aborted) { + store.setError( + chatId, + err instanceof Error ? err.message : "Preview session unavailable.", + ); + } + } finally { + // The loop has fully broken (terminal frame, exhausted budget, or + // supersede), so release the still-open `/listen` socket: the server tails + // perpetually and only tears down on client disconnect. `controller` is + // run-local; re-aborting an aborted one is a no-op. + controller.abort(); + if (rt.epoch === epoch) { + rt.streaming = false; + // Stream ended without a terminal frame mid-conversation → treat as + // awaiting input so the composer stays usable. + if (agentChatStore.getState().chats[chatId]?.status === "streaming") { + agentChatStore.getState().setStatus(chatId, "awaiting_input"); + } + } + } + } + + async start( + client: PostHogAPIClient, + session: AgentChatSession, + text: string, + ): Promise { + const rt = this.runtime(session); + rt.mapper = session.createMapper(); + const s = agentChatStore.getState(); + s.begin(session.chatId, session.agentSlug); + // Render the user's clean message immediately; the stream's echo (which + // includes the context envelope) is stripped + deduped by the mapper. + s.appendMessages(session.chatId, rt.mapper.seedUserMessage(text)); + try { + const { session_id } = await this.withPreviewToken( + client, + rt, + session, + (token) => + client.runAgentSession( + session.ingressBaseUrl, + session.buildWireText(text), + token, + ), + ); + agentChatStore.getState().setSessionId(session.chatId, session_id); + agentChatStore.getState().setStatus(session.chatId, "streaming"); + session.onSessionStarted?.(session_id, text); + void this.runStream(client, session, session_id); + } catch (err) { + agentChatStore.getState().setStatus(session.chatId, "failed"); + agentChatStore + .getState() + .setError( + session.chatId, + err instanceof Error ? err.message : "Couldn't start chat.", + ); + } + } + + async send( + client: PostHogAPIClient, + session: AgentChatSession, + text: string, + ): Promise { + const s = agentChatStore.getState(); + const sessionId = s.chats[session.chatId]?.sessionId; + if (!sessionId) return this.start(client, session, text); + const rt = this.runtime(session); + // Render the user's message immediately; the stream's echo is deduped. + s.appendMessages(session.chatId, rt.mapper.seedUserMessage(text)); + s.setStatus(session.chatId, "streaming"); + try { + await this.withPreviewToken(client, rt, session, (token) => + client.sendAgentMessage(session.ingressBaseUrl, sessionId, text, token), + ); + if (!rt.streaming) void this.runStream(client, session, sessionId); + } catch (err) { + s.setStatus(session.chatId, "failed"); + s.setError( + session.chatId, + err instanceof Error ? err.message : "Couldn't send.", + ); + } + } + + async cancel( + client: PostHogAPIClient, + session: AgentChatSession, + ): Promise { + const s = agentChatStore.getState(); + const sessionId = s.chats[session.chatId]?.sessionId; + const rt = this.runtime(session); + rt.abort?.abort(); + s.setStatus(session.chatId, "cancelled"); + if (sessionId && session.ingressBaseUrl) { + try { + await this.withPreviewToken(client, rt, session, (token) => + client.cancelAgentSession(session.ingressBaseUrl, sessionId, token), + ); + } catch { + // Best-effort. + } + } + } + + /** + * Resolve an interactive client tool (set_secret) once the user submits its + * form: post the outcome via `/send` (waking the parked session) and make sure + * the stream is attached to receive the resulting turn. + */ + async resolveInteractiveTool( + client: PostHogAPIClient, + session: AgentChatSession, + callId: string, + outcome: { result: Record } | { error: string }, + ): Promise { + const sessionId = + agentChatStore.getState().chats[session.chatId]?.sessionId; + if (!sessionId) return; + const rt = this.runtime(session); + agentChatStore.getState().setStatus(session.chatId, "streaming"); + try { + await this.withPreviewToken(client, rt, session, (token) => + client.sendAgentInteractiveToolResult( + session.ingressBaseUrl, + sessionId, + callId, + outcome, + token, + ), + ); + if (!rt.streaming) void this.runStream(client, session, sessionId); + } catch (err) { + agentChatStore.getState().setStatus(session.chatId, "awaiting_input"); + agentChatStore + .getState() + .setError( + session.chatId, + err instanceof Error ? err.message : "Couldn't submit the secret.", + ); + } + } + + /** + * Re-open a past chat. `/listen` only tails (it does not replay), so history is + * rebuilt from the stored transcript; a still-active session then attaches the + * live stream so the user can keep chatting where they left off. + */ + async resume( + client: PostHogAPIClient, + session: AgentChatSession, + sessionId: string, + ): Promise { + if ( + agentChatStore.getState().chats[session.chatId]?.sessionId === sessionId + ) + return; + const rt = this.runtime(session); + rt.abort?.abort(); + rt.epoch += 1; + rt.streaming = false; + rt.mapper = session.createMapper(); + const s = agentChatStore.getState(); + s.begin(session.chatId, session.agentSlug); + s.setSessionId(session.chatId, sessionId); + s.setStatus(session.chatId, "starting"); + try { + const detail = await client.getAgentApplicationSession( + session.agentSlug, + sessionId, + ); + // A newer resume/new-chat won the race while we were fetching. + if ( + agentChatStore.getState().chats[session.chatId]?.sessionId !== sessionId + ) + return; + const conversation = detail?.conversation ?? []; + agentChatStore + .getState() + .appendMessages(session.chatId, session.mapConversation(conversation)); + rt.mapper.setPromptIdBase( + conversation.filter((m) => m.role === "user").length, + ); + if (!detail || TERMINAL_SESSION_STATES.has(detail.state)) { + agentChatStore.getState().setStatus(session.chatId, "completed"); + } else { + agentChatStore.getState().setStatus(session.chatId, "streaming"); + void this.runStream(client, session, sessionId); + } + } catch (err) { + if ( + agentChatStore.getState().chats[session.chatId]?.sessionId !== sessionId + ) + return; + agentChatStore.getState().setStatus(session.chatId, "failed"); + agentChatStore + .getState() + .setError( + session.chatId, + err instanceof Error ? err.message : "Couldn't load this chat.", + ); + } + } + + /** Clear the surface for a brand-new chat; the next send starts a new session. */ + newChat(session: AgentChatSession): void { + const rt = this.runtime(session); + rt.abort?.abort(); + rt.epoch += 1; + rt.streaming = false; + rt.mapper = session.createMapper(); + agentChatStore.getState().reset(session.chatId); + } + + /** Release the open `/listen` socket when the consumer unmounts. */ + releaseStream(chatId: string): void { + this.runtimes.get(chatId)?.abort?.abort(); + } +} diff --git a/packages/core/src/agent-chat/identifiers.ts b/packages/core/src/agent-chat/identifiers.ts new file mode 100644 index 0000000000..b12805f0b5 --- /dev/null +++ b/packages/core/src/agent-chat/identifiers.ts @@ -0,0 +1,63 @@ +import type { AcpMessage } from "@posthog/shared"; +import type { + AgentConversationMessage, + AgentSessionEvent, +} from "@posthog/shared/agent-platform-types"; + +export const AGENT_CHAT_SERVICE = Symbol.for("posthog.core.agentChat.service"); + +/** + * Incremental SSE→ACP mapper, implemented in the UI (`createAgentChatMapper`) + * and handed to the service per chat. Stateful: a fresh one per session/stream. + */ +export interface AgentChatMapper { + /** Optimistically render the user's just-sent message; the echoed frame is deduped. */ + seedUserMessage(text: string, ts?: number): AcpMessage[]; + /** Continue prompt-id numbering past `count` restored turns (resume). */ + setPromptIdBase(count: number): void; + /** Translate one SSE event into zero or more ACP messages. */ + apply(event: AgentSessionEvent): AcpMessage[]; +} + +export type ClientToolCallData = Extract< + AgentSessionEvent, + { kind: "client_tool_call" } +>["data"]; + +/** + * A client-tool result. `defer: true` means the host opened an interactive UI + * and will post the outcome itself, so the service must not post one now. + */ +export interface ClientToolOutcome { + result?: unknown; + error?: string; + defer?: boolean; +} + +/** + * Per-chat host seam supplied by the renderer hook. The transport saga lives in + * the service; mapping, client-tool resolution, the context envelope, and local + * history are UI concerns the service calls back into. Callbacks are expected to + * be stable and read the latest handlers internally, so a long-lived stream + * always sees current state. + */ +export interface AgentChatSession { + /** Opaque key isolating this chat in the store. */ + chatId: string; + /** Agent slug the chat targets. */ + agentSlug: string; + ingressBaseUrl: string; + /** Non-null targets a specific draft revision (preview token attached per call). */ + revisionId: string | null; + createMapper(): AgentChatMapper; + /** Resolve a client-tool call; `defer`/null ⇒ the service won't post a result. */ + resolveClientTool( + data: ClientToolCallData, + ): Promise; + /** Compose the wire text for a first message (prepends the context envelope). */ + buildWireText(text: string): string; + /** Map a stored transcript to ACP messages (resume). */ + mapConversation(messages: AgentConversationMessage[]): AcpMessage[]; + /** Fired once a run starts, so the host can index local history. */ + onSessionStarted?(sessionId: string, text: string): void; +} diff --git a/packages/shared/src/agent-platform-types.ts b/packages/shared/src/agent-platform-types.ts index dc79f7fc45..83e5399776 100644 --- a/packages/shared/src/agent-platform-types.ts +++ b/packages/shared/src/agent-platform-types.ts @@ -1,11 +1,5 @@ -// Domain types for the agent_platform product surface (deployed agents, -// their revisions, sessions, approvals, and fleet rollups). These mirror the -// PostHog Cloud REST serializers (Django app `agent_platform`) and are the wire -// shapes returned by the corresponding PostHogAPIClient methods. Field names -// stay snake_case to match the JSON exactly, as with the other shared wire -// types (see inbox-types.ts). - -// --- Enums ----------------------------------------------------------------- +// Wire shapes mirroring the PostHog Cloud REST serializers (Django app +// `agent_platform`). Field names stay snake_case to match the JSON exactly. export type AgentSessionState = | "queued" @@ -34,8 +28,6 @@ export type AgentApprovalRequestState = export type AgentApprovalDecision = "approve" | "reject"; -// --- Applications ---------------------------------------------------------- - /** Resolved creator (from `created_by_id`), or null if unset/deleted. */ export interface AgentApplicationCreator { id?: number; @@ -65,12 +57,9 @@ export interface AgentApplication { ingress_base_url: string | null; } -// --- Revisions ------------------------------------------------------------- - /** - * The agent spec carried on a revision. Fully typed elaboration (triggers, - * tools, mcps, skills, limits) lands with the config editor milestone; for now - * the known top-level fields are surfaced and the rest passes through. + * The agent spec carried on a revision. Known top-level fields are surfaced and + * the rest passes through pending fully-typed elaboration. */ export interface AgentSpec { model: string; @@ -104,16 +93,12 @@ export interface AgentRevision { updated_at: string; } -// --- Preview tokens -------------------------------------------------------- // `…/agent_applications/{id}/preview-token/?revision_id=` mints a // short-lived HS256 JWT that authorizes the ingress to route /run /send /listen // /cancel against a non-live revision. Sent on those calls via the // `X-Agent-Preview-Token` header (or `?preview_token=` query for EventSource), -// alongside the usual PostHog bearer (which the fetcher attaches regardless -// of host). -// -// The response is self-describing: `endpoints` carries the per-trigger preview -// URLs the caller should hit directly, so the client never has to derive a +// alongside the usual PostHog bearer. The response's `endpoints` carry the +// per-trigger preview URLs to hit directly, so the client never derives a // revision-scoped ingress URL by string-mangling `application.ingress_base_url`. /** Per-trigger preview URLs, keyed by trigger type → action → absolute URL. */ @@ -138,7 +123,6 @@ export interface AgentPreviewToken { preview_proxy: Record; } -// --- Bundle files ---------------------------------------------------------- // `…/revisions/{id}/bundle/` returns a typed bundle ({ agent_md, skills, tools }); // the client flattens it into these per-file rows keyed by canonical path // (agent.md, skills//SKILL.md, tools//source.ts, tools//schema.json). @@ -151,7 +135,6 @@ export interface BundleFile { language: BundleFileLanguage; } -// --- Slack setup ----------------------------------------------------------- // `…/revisions/{id}/slack_manifest/` derives the Slack app manifest from the // revision's slack trigger + tools (scopes + event subscriptions computed). @@ -164,7 +147,6 @@ export interface AgentSlackManifest { interactivity_url: string | null; } -// --- Memory ---------------------------------------------------------------- // The agent's S3-backed memory store: markdown files (`…/memory/…`) plus the // JSONL reference tables the @posthog/table-* tools write. @@ -211,8 +193,6 @@ export interface AgentMemoryTableRows { rows: Record[]; } -// --- Sessions -------------------------------------------------------------- - export interface AgentSessionUsageTotal { tokens_in: number; tokens_out: number; @@ -258,11 +238,10 @@ export interface AgentApplicationSessionsListResponse { count: number; } -// --- Conversation transcript (stored shape on a session) ------------------- -// The runtime persists pi-ai's `conversation` array. The SSE→ACP adapter and -// the session-detail transcript both narrow these `content` parts at runtime. -// Part shapes mirror what the agent-console apiClient narrows (text/thinking/ -// toolCall for assistants; text/image for users; text for tool results). +// Stored conversation shape on a session: the runtime persists pi-ai's +// `conversation` array. Part shapes mirror what the agent-console apiClient +// narrows (text/thinking/toolCall for assistants; text/image for users; text +// for tool results). export interface AgentTextPart { type: "text"; @@ -352,7 +331,6 @@ export interface AgentApplicationSessionDetail { conversation_total_turns?: number; } -// --- Session logs ---------------------------------------------------------- // `…/sessions/{id}/logs/` returns rows from the shared ClickHouse `log_entries` // table via `fetch_log_entries` — the same flat shape hog_function logs use. @@ -377,8 +355,6 @@ export interface AgentSessionLogsParams { before?: string; } -// --- Fleet ----------------------------------------------------------------- - export interface AgentFleetLiveSessionSummary { id: string; application_id: string; @@ -399,8 +375,6 @@ export interface AgentFleetLiveSessionsResponse { results: AgentFleetLiveSessionSummary[]; } -// --- Approvals ------------------------------------------------------------- - export interface AgentApprovalRequest { id: string; session_id: string; @@ -431,8 +405,6 @@ export interface DecideApprovalRequest { reason?: string; } -// --- Query params ---------------------------------------------------------- - export interface AgentSessionsListParams { limit?: number; offset?: number; @@ -450,11 +422,10 @@ export interface AgentApprovalsListParams { offset?: number; } -// --- Live session events (agent-ingress SSE stream) ------------------------ -// The chat trigger's `/listen` endpoint streams these as `text/event-stream` -// JSON frames. The SSE→ACP adapter folds them into ACP messages the native -// ConversationView renders. The `kind` discriminator and `data` payloads come -// from `agent-ingress/src/triggers/chat.ts` + `agent-runner/src/loop/bus.ts`. +// Live session events from the chat trigger's `/listen` endpoint (SSE +// `text/event-stream` JSON frames). The `kind` discriminator and `data` +// payloads come from `agent-ingress/src/triggers/chat.ts` + +// `agent-runner/src/loop/bus.ts`. interface AgentSessionEventBase { session_id: string; @@ -565,12 +536,9 @@ export type AgentClientToolResultEvent = AgentSessionEventBase & { }; /** - * Draft-preview only. The server fires this on `/listen` ~5s before the - * preview token expires (and then closes the stream): the client mints a - * fresh token and reconnects to the same session. The kind alone is the - * signal — `data` is structurally `Record` (matching - * `AgentClosedEvent`) for the discriminated-union shape, but no fields are - * defined or read. + * Draft-preview only. Server fires this on `/listen` ~5s before the preview + * token expires (then closes the stream); the client mints a fresh token and + * reconnects. The kind alone is the signal — `data` is unused. */ export type AgentPreviewTokenRequiredEvent = AgentSessionEventBase & { kind: "preview_token_required"; @@ -599,14 +567,11 @@ export type AgentSessionEvent = /** Discriminator values for {@link AgentSessionEvent}. */ export type AgentSessionEventKind = AgentSessionEvent["kind"]; -// --- Observability / analytics -------------------------------------------- -// The runner captures `$ai_*` AI-observability events into the team's OWN -// PostHog project (tagged `$ai_origin = 'agent_platform_runner'` and -// `$agent_application_id`). The observability surface rolls those up via HogQL -// (`/query/`) into the shapes below. These are the *derived* analytics shapes -// the client produces from raw HogQL grids — not a backend wire serializer — -// but they live here so the UI hooks can import them alongside the other -// agent-platform types. +// The runner captures `$ai_*` observability events into the team's OWN PostHog +// project (tagged `$ai_origin = 'agent_platform_runner'`, `$agent_application_id`); +// the observability surface rolls those up via HogQL. These are the *derived* +// analytics shapes the client produces from raw HogQL grids — not a backend wire +// serializer — but live here so UI hooks import them alongside the other types. export interface AgentAnalyticsKpis { spendUsd: number; diff --git a/packages/ui/src/features/agent-applications/agent-applications.module.ts b/packages/ui/src/features/agent-applications/agent-applications.module.ts deleted file mode 100644 index a64f2448cc..0000000000 --- a/packages/ui/src/features/agent-applications/agent-applications.module.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { ContainerModule } from "inversify"; - -/** - * UI module for the agent-applications feature (deployed agent_platform - * agents). Currently holds no bindings — the chat/agent builder contributions and - * any view-state slices are added in later milestones. Registered in - * apps/code/src/renderer/desktop-contributions.ts once it binds a CONTRIBUTION. - */ -export const agentApplicationsUiModule = new ContainerModule(() => {}); diff --git a/packages/ui/src/features/agent-applications/agent-builder/useAgentBuilderClientTools.ts b/packages/ui/src/features/agent-applications/agent-builder/useAgentBuilderClientTools.ts index caab1e87da..aa90a64ddd 100644 --- a/packages/ui/src/features/agent-applications/agent-builder/useAgentBuilderClientTools.ts +++ b/packages/ui/src/features/agent-applications/agent-builder/useAgentBuilderClientTools.ts @@ -20,9 +20,8 @@ export function useAgentBuilderClientTools(): ClientToolHandler { const page = useAgentBuilderStore((s) => s.page); const followRef = useRef(followMode); followRef.current = followMode; - // Latest page context without re-creating the handler each render — used to - // resolve the revision a `set_secret` punch-out targets when the agent - // doesn't name one in the tool args. + // Latest page context without re-creating the handler each render — resolves + // the revision a `set_secret` punch-out targets when the agent omits one. const pageRef = useRef(page); pageRef.current = page; diff --git a/packages/ui/src/features/agent-applications/chat/acpEnvelope.ts b/packages/ui/src/features/agent-applications/chat/acpEnvelope.ts index c4964695d4..417f3b0c39 100644 --- a/packages/ui/src/features/agent-applications/chat/acpEnvelope.ts +++ b/packages/ui/src/features/agent-applications/chat/acpEnvelope.ts @@ -91,8 +91,6 @@ export function turnCompleteMessage( }; } -// --- SessionUpdate builders ------------------------------------------------- - /** Streaming/settled assistant text fragment. */ export function agentTextUpdate(text: string): SessionUpdate { return { sessionUpdate: "agent_message_chunk", content: textBlock(text) }; diff --git a/packages/ui/src/features/agent-applications/chat/sessionEventToAcp.ts b/packages/ui/src/features/agent-applications/chat/sessionEventToAcp.ts index 9424e4b6a5..2a0f938153 100644 --- a/packages/ui/src/features/agent-applications/chat/sessionEventToAcp.ts +++ b/packages/ui/src/features/agent-applications/chat/sessionEventToAcp.ts @@ -15,6 +15,7 @@ * rendering half-streamed JSON as `rawInput` reads worse than a brief gap. */ +import type { AgentChatMapper } from "@posthog/core/agent-chat/identifiers"; import type { AcpMessage } from "@posthog/shared"; import type { AgentSessionEvent } from "@posthog/shared/agent-platform-types"; import { @@ -49,23 +50,7 @@ function outputText(value: unknown): string { } } -export interface AgentChatMapper { - /** - * Optimistically emit the user's just-sent message so it renders the instant - * they hit send, before the network round-trip. The stream echoes the same - * message back a beat later as a `user_message` event — that echo is swallowed - * (matched by text, FIFO) so it isn't rendered twice. - */ - seedUserMessage(text: string, ts?: number): AcpMessage[]; - /** - * Continue prompt (request) id numbering past `count` restored turns, so a - * follow-up message on a resumed chat doesn't collide with a turn rebuilt - * from the stored transcript. - */ - setPromptIdBase(count: number): void; - /** Translate one SSE event into zero or more ACP messages. */ - apply(event: AgentSessionEvent): AcpMessage[]; -} +export type { AgentChatMapper }; export function createAgentChatMapper(): AgentChatMapper { let promptId = 0; diff --git a/packages/ui/src/features/agent-applications/components/AgentAnalyticsView.tsx b/packages/ui/src/features/agent-applications/components/AgentAnalyticsView.tsx index ab0b63d4d0..d9b9fff0d5 100644 --- a/packages/ui/src/features/agent-applications/components/AgentAnalyticsView.tsx +++ b/packages/ui/src/features/agent-applications/components/AgentAnalyticsView.tsx @@ -99,8 +99,6 @@ export function AgentAnalyticsView({ ); } -/* ── KPIs ─────────────────────────────────────────────────────────── */ - /** * The four top-line KPI tiles (spend / sessions / failure rate / p95) with * 14-day spark trends + WoW deltas. Reused standalone on the per-agent Overview @@ -250,8 +248,6 @@ function DeltaChip({ ); } -/* ── Charts ───────────────────────────────────────────────────────── */ - function CostByModelChart({ rows }: { rows: AgentAnalyticsModelRow[] }) { const theme = useChartTheme(); if (rows.length === 0) { @@ -276,8 +272,6 @@ function CostByModelChart({ rows }: { rows: AgentAnalyticsModelRow[] }) { ); } -/* ── Tables ───────────────────────────────────────────────────────── */ - function ToolTable({ rows }: { rows: AgentAnalyticsToolRow[] }) { if (rows.length === 0) { return ; @@ -312,8 +306,6 @@ function ToolTable({ rows }: { rows: AgentAnalyticsToolRow[] }) { ); } -/* ── Primitives ───────────────────────────────────────────────────── */ - function Panel({ title, children }: { title: string; children: ReactNode }) { return (
diff --git a/packages/ui/src/features/agent-applications/components/AgentConfigurationPane.tsx b/packages/ui/src/features/agent-applications/components/AgentConfigurationPane.tsx index e1e43cde88..08815b7ec7 100644 --- a/packages/ui/src/features/agent-applications/components/AgentConfigurationPane.tsx +++ b/packages/ui/src/features/agent-applications/components/AgentConfigurationPane.tsx @@ -43,7 +43,7 @@ import { FileExplorer, type FileTreeNode } from "./FileExplorer"; import { SecretEditor } from "./SecretEditor"; import { SlackSetupCard } from "./SlackSetupCard"; -// --- value readers (spec items are loosely typed on the wire) --------------- +// Value readers — spec items are loosely typed on the wire. function rec(v: unknown): Record { return v && typeof v === "object" ? (v as Record) : {}; } @@ -136,8 +136,6 @@ function WarnBadge({ title }: { title: string }) { ); } -// --- tree ------------------------------------------------------------------- - function buildTree(spec: AgentSpec, setKeys: string[]): FileTreeNode { // Order chosen for how operators read an agent: what it is, what starts it, // what it needs, what it knows, what it can do. @@ -298,8 +296,6 @@ function buildTree(spec: AgentSpec, setKeys: string[]): FileTreeNode { return { type: "folder", name: "root", children }; } -// --- pane ------------------------------------------------------------------- - export function AgentConfigurationPane({ idOrSlug, selectedNode, @@ -403,8 +399,6 @@ export function AgentConfigurationPane({ ); } -// --- detail dispatch -------------------------------------------------------- - const SECTION_INFO: Record = { "cfg:model": "The model every request goes to. `reasoning` sets the extended-thinking budget; limits cap a run's turns, tool calls and wall time.", @@ -615,8 +609,6 @@ function byPath(files: BundleFile[], path: string): BundleFile | undefined { return files.find((f) => f.path === path); } -// --- bodies ----------------------------------------------------------------- - function ModelBody({ spec }: { spec: AgentSpec }) { return ( @@ -1265,8 +1257,6 @@ function BundleFileBody({ return {file.content}; } -// --- shared bits ------------------------------------------------------------ - function Row({ label, value, diff --git a/packages/ui/src/features/agent-applications/components/AgentMemoryPane.tsx b/packages/ui/src/features/agent-applications/components/AgentMemoryPane.tsx index d31cba7f43..d70dd732bf 100644 --- a/packages/ui/src/features/agent-applications/components/AgentMemoryPane.tsx +++ b/packages/ui/src/features/agent-applications/components/AgentMemoryPane.tsx @@ -56,8 +56,6 @@ export function AgentMemoryPane({ idOrSlug }: { idOrSlug: string }) { ); } -// --- files ------------------------------------------------------------------ - function toFileTree(node: AgentMemoryTreeNode): FileTreeNode { return { type: node.type, @@ -164,8 +162,6 @@ function MemoryFileDetail({ ); } -// --- tables ----------------------------------------------------------------- - function MemoryTables({ idOrSlug }: { idOrSlug: string }) { const [selected, setSelected] = useState(null); const { data: tables, isLoading, isError } = useAgentMemoryTables(idOrSlug); diff --git a/packages/ui/src/features/agent-applications/components/AgentRevisionBar.tsx b/packages/ui/src/features/agent-applications/components/AgentRevisionBar.tsx index f62ab8cebe..91fe18cad4 100644 --- a/packages/ui/src/features/agent-applications/components/AgentRevisionBar.tsx +++ b/packages/ui/src/features/agent-applications/components/AgentRevisionBar.tsx @@ -247,12 +247,10 @@ export function AgentRevisionBar({ {/* - * Test — runs this revision through the live ingress with a preview - * token, before it's promoted. The chat tab handles the rest (mint + - * token attach via useAgentChat). Live uses the default Chat tab; - * archived can't be exercised. Label varies by state: "Test draft" - * leans into the unfinished work; for `ready` the bundle is frozen so - * plain "Test" is more accurate. + * Test — runs this not-yet-promoted revision through the live ingress + * with a preview token (the chat tab mints + attaches it via + * useAgentChat). Live uses the default Chat tab; archived can't be + * exercised. */} {selected.state !== "live" && selected.state !== "archived" && @@ -273,10 +271,9 @@ export function AgentRevisionBar({ ) : null} {/* - * Clone to draft — fork this revision into a fresh editable draft. - * The standard exit when a ready/live/archived bundle is immutable - * but you want to keep iterating. Pre-selects the new draft so the - * picker lands you in edit mode immediately. + * Clone to draft — fork this revision into a fresh editable draft (the + * exit when a ready/live/archived bundle is immutable but you want to + * keep iterating). Pre-selects the new draft. */} {selected.state !== "draft" ? (