diff --git a/.changeset/fork-tool-horton.md b/.changeset/fork-tool-horton.md new file mode 100644 index 0000000000..4e04df8113 --- /dev/null +++ b/.changeset/fork-tool-horton.md @@ -0,0 +1,7 @@ +--- +'@electric-ax/agents': patch +--- + +Give Horton a `fork` tool that creates a child session inheriting this conversation's history up to the latest completed response. Takes an optional `entityUrl` (omit for self-fork), an optional `initialMessage` (delivered atomically — no follow-up `send` needed), and optional `tags`. The fork is created as a CHILD of the calling entity (same parent-ownership model as `spawn_worker`) and wires reply delivery through the same manifest-anchored wake — when the fork's next run finishes, the parent wakes with the response in the wake message. + +Horton's system prompt grows a "When to fork (vs spawn_worker)" section framing the two tools as a pair: both create a child the parent owns and gets replies from, the difference is what the child boots with — `spawn_worker` starts with an empty context (you brief it from scratch), `fork` starts with a copy of the conversation up to the latest completed response. Includes an explicit trigger pattern ("prefer fork when generating multiple variants the user wants to compare; don't inline") to route "give me three takes" / "evaluate these N approaches" prompts to fork rather than collapsing them into one inline response, plus the workflow for the parallel-exploration loop (end-turn-first, fork-once-per-branch with a different `initialMessage` each, wait for all responses before synthesising). diff --git a/.changeset/fork-tool-runtime.md b/.changeset/fork-tool-runtime.md new file mode 100644 index 0000000000..6b297b2aeb --- /dev/null +++ b/.changeset/fork-tool-runtime.md @@ -0,0 +1,21 @@ +--- +'@electric-ax/agents-runtime': patch +--- + +Add `ctx.fork(opts?)` to `HandlerContext`, with an opts shape that mirrors `ctx.spawn`'s where the semantics map: + +```ts +ctx.fork(opts?: { + targetEntityUrl?: string // omit for self-fork + initialMessage?: unknown // delivered atomically with creation + wake?: Wake // overrides the default runFinished + includeResponse + tags?: Record + observe?: boolean // `false` = fire-and-forget (no parent, no wake, no manifest entry) +}) +``` + +By default (`observe: true`), the new fork is a CHILD of this entity (same parent-ownership model as `ctx.spawn`), and a `runFinished + includeResponse` wake is registered on it server-side. Reply delivery uses the same manifest-anchored wake mechanism `ctx.spawn` uses — when the fork's next run finishes, this entity wakes with the response. `observe: false` opts out of the parent relationship entirely: no parent URL, no wake subscription, no manifest entry on the parent's stream. + +Internally writes a `kind: 'child'` manifest row on the parent's stream alongside the server-side wake registration, mirroring the spawn flow's bookkeeping so the relationship persists across wakes. Wired through new fields on `RuntimeServerClient.forkEntity` (`parent`, `wake`, `initialMessage`, `tags`) and `WiringConfig.forkEntity`. A `normalizeWake` helper translates the user-facing `Wake` type into the wakeRegistry-compatible shape, same logic `createOrGetChild` uses for spawn. + +The `send` tool's `payload` description now documents the canonical `{ text: "..." }` shape for chat-rendered targets (Horton sessions, agent forks) so messages emitted by `send` render as chat bubbles instead of blank bars. diff --git a/.changeset/fork-tool-server.md b/.changeset/fork-tool-server.md new file mode 100644 index 0000000000..efccc8d150 --- /dev/null +++ b/.changeset/fork-tool-server.md @@ -0,0 +1,16 @@ +--- +'@electric-ax/agents-server': patch +'@electric-ax/agents-server-ui': patch +--- + +Add server-resolved fork anchor + spawn-parity body fields to `POST /_electric/entities///fork`. + +- `anchor: 'latest_completed_run'` is an alternative to `fork_pointer`: the server scans the source root's `main` history, finds the most recent `runs` row with `status === 'completed'`, derives the matching `{ offset, sub_offset }` pointer, and runs the existing pointer-fork path with it. Mutually exclusive with `fork_pointer` (400 if both); 400 if no completed run exists. Lets callers without access to the source's per-row pointer side-table (e.g. an agent forking via a tool) fork at the same anchor the per-row "Fork from here" UI uses. +- `parent` overrides the new root fork's `parent` field, making it a CHILD of that URL (rather than inheriting the source's parent). +- `wake` registers a subscription on the new root fork at fork time (same shape as `spawn`'s `wake`). +- `initialMessage` is delivered to the new root fork via `entityManager.send` after `linkEntityDispatchSubscription` runs — same ordering spawn uses, so the dispatcher is subscribed before the inbox row lands and the fork actually wakes on the message instead of sitting idle. +- `tags` are stamped on the new root fork in addition to those copied from the source. + +Together these let an agent fork itself as a child and receive replies via the same manifest-anchored wake mechanism `spawn` uses, with a single round-trip fork-and-dispatch. + +Chat UI: `readInboxText` falls back to `message` and `content` keys when `text` isn't present, so messages sent by agents (which sometimes emit those shapes) render as a chat bubble body instead of a blank bar. diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index d0c8354fd0..2de3ce3190 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -29,6 +29,7 @@ import type { EntitySignal, EntityHandle, EntityStreamDBWithActions, + ForkOptions, HandlerContext, LLMContentBlock, LLMMessage, @@ -102,6 +103,11 @@ export interface HandlerContextConfig { observe?: boolean } ) => Promise + doFork: ( + sourceEntityUrl: string, + id: string, + opts: ForkOptions + ) => Promise doMkdb: ( id: string, schema: TSchema @@ -758,6 +764,16 @@ export function createHandlerContext( ): Promise { return config.doSpawn(type, id, args, opts) }, + fork( + sourceEntityUrl: string, + id: string, + opts?: ForkOptions + ): Promise { + return config.doFork(sourceEntityUrl, id, opts ?? {}) + }, + forkSelf(id: string, opts?: ForkOptions): Promise { + return config.doFork(config.entityUrl, id, opts ?? {}) + }, mkdb( id: string, schema: TSchema diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index 63b0c3abb7..bb46b555d2 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -7,8 +7,10 @@ import { normalizeObservationSchema } from './observation-schema' import { createWakeSession } from './wake-session' import { createHandlerContext } from './context-factory' import { createSetupContext } from './setup-context' +import type { WiringConfig } from './setup-context' import { createEntityLogPrefix, runtimeLog } from './log' import { createRuntimeServerClient } from './runtime-server-client' +import type { RuntimeServerClient } from './runtime-server-client' import { unrestrictedSandbox } from './sandbox/unrestricted' import { resolveSandboxIdentity } from './sandbox/identity' import { appendPathToUrl } from './url' @@ -36,6 +38,7 @@ import type { ObservationHandle, ObservationSource, ProcessWakeConfig, + ForkOptions, SendResult, SharedStateSchemaMap, SpawnSandboxOption, @@ -106,6 +109,36 @@ function inboxEventKey(event: ChangeEvent): string { return String(event.key) } +// Translate the user-facing `Wake` into the wakeRegistry-compatible +// shape used by `serverClient.forkEntity`. Same translation +// `createOrGetChild` does inline for spawn (subscriberUrl is fixed by +// the caller — typically the new fork's parent). +type ServerForkWake = NonNullable< + Parameters[0][`wake`] +> +function normalizeForkWake(wake: Wake, subscriberUrl: string): ServerForkWake { + const isRunFinished = + wake === `runFinished` || + (typeof wake === `object` && wake.on === `runFinished`) + const condition = isRunFinished + ? (`runFinished` as const) + : (wake as Exclude) + const result: ServerForkWake = { + subscriberUrl, + condition, + } + if (typeof wake === `object` && wake.on === `runFinished`) { + if (wake.includeResponse !== undefined) { + result.includeResponse = wake.includeResponse + } + } + if (typeof wake === `object` && wake.on === `change`) { + if (wake.debounceMs !== undefined) result.debounceMs = wake.debounceMs + if (wake.timeoutMs !== undefined) result.timeoutMs = wake.timeoutMs + } + return result +} + function toError(err: unknown): Error { return err instanceof Error ? err : new Error(String(err)) } @@ -1328,6 +1361,33 @@ export async function processWake( }) }, + forkEntity: async ( + sourceEntityUrl: string, + opts?: Parameters[1] + ): Promise<{ entityUrl: string; streamPath: string }> => { + // Normalize the user-facing Wake into the wakeRegistry- + // compatible shape — same translation `createOrGetChild` does + // for spawn. subscriberUrl is derived from `opts.parent` + // (the only valid target after the route's wake validation). + const wakeOpt = + opts?.wake && opts.parent + ? normalizeForkWake(opts.wake, opts.parent) + : undefined + const result = await serverClient.forkEntity({ + sourceEntityUrl, + ...(opts?.instanceId !== undefined && { + instanceId: opts.instanceId, + }), + ...(opts?.parent !== undefined && { parent: opts.parent }), + ...(wakeOpt && { wake: wakeOpt }), + ...(opts?.initialMessage !== undefined && { + initialMessage: opts.initialMessage, + }), + ...(opts?.tags !== undefined && { tags: opts.tags }), + }) + return { entityUrl: result.entityUrl, streamPath: result.streamPath } + }, + createChildDb: async ( childStreamUrl: string, childTypeName?: string, @@ -1749,6 +1809,14 @@ export async function processWake( return setupCtx.spawn(type, id, spawnArgs, opts) } + const doFork = ( + sourceEntityUrl: string, + id: string, + opts: ForkOptions + ): Promise => { + return setupCtx.fork(sourceEntityUrl, id, opts) + } + const doMkdb = ( id: string, schema: TSchema @@ -2039,6 +2107,7 @@ export async function processWake( hydratedEventSourceWake: await hydrateCurrentEventSourceWake(), doObserve, doSpawn, + doFork, doMkdb, doCreateAttachment: (attachment) => serverClient diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index 3142059414..702d15adae 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -121,6 +121,37 @@ export interface RuntimeServerClient { id: string }) => Promise spawnEntity: (options: SpawnEntityOptions) => Promise + /** + * Fork an entity at the server-resolved `latest_completed_run` anchor. + * Resolves to the new root entity's info. Wraps the agents-server + * `POST /_electric/entities///fork` endpoint. + * + * Optional fields mirror `spawnEntity`: + * - `parent` makes the new fork a child of that URL. + * - `wake` registers a subscription at fork time (reply delivery + * uses the parent's manifest-anchored wake when paired with a + * manifest entry on the parent — same model as `spawn`). + * - `initialMessage` delivers an inbox message to the new fork + * atomically with creation, folding fork+send into one round-trip. + * - `tags` stamps tags onto the new fork in addition to those + * copied from the source. + */ + forkEntity: (options: { + sourceEntityUrl: string + /** Maps to the server's `instance_id` body field. */ + instanceId?: string + parent?: string + wake?: { + subscriberUrl: string + condition: RegisterWakeOptions[`condition`] + debounceMs?: number + timeoutMs?: number + includeResponse?: boolean + manifestKey?: string + } + initialMessage?: unknown + tags?: Record + }) => Promise getEntity: (entityUrl: string) => Promise ensureSharedStateStream: ( sharedStateId: string, @@ -449,6 +480,53 @@ export function createRuntimeServerClient( return entityInfo } + const forkEntity = async ({ + sourceEntityUrl, + instanceId, + parent, + wake, + initialMessage, + tags, + }: { + sourceEntityUrl: string + instanceId?: string + parent?: string + wake?: { + subscriberUrl: string + condition: RegisterWakeOptions[`condition`] + debounceMs?: number + timeoutMs?: number + includeResponse?: boolean + manifestKey?: string + } + initialMessage?: unknown + tags?: Record + }): Promise => { + const body: Record = { + anchor: `latest_completed_run`, + } + if (instanceId !== undefined) body.instance_id = instanceId + if (parent !== undefined) body.parent = parent + if (wake !== undefined) body.wake = wake + if (initialMessage !== undefined) body.initialMessage = initialMessage + if (tags !== undefined) body.tags = tags + const response = await request(entityRpcPath(sourceEntityUrl, `/fork`), { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify(body), + }) + if (!response.ok) { + throw new Error( + `fork ${sourceEntityUrl} failed (${response.status}): ${await readErrorText(response)}` + ) + } + const payload = (await response.json()) as { root?: RuntimeEntityResponse } + return requireEntityInfo( + payload.root, + `fork ${sourceEntityUrl} returned an invalid root payload` + ) + } + const ensureSharedStateStream = async ( sharedStateId: string, ownerEntityUrl?: string @@ -778,6 +856,7 @@ export function createRuntimeServerClient( createAttachment, readAttachment, spawnEntity, + forkEntity, getEntity, ensureSharedStateStream, signalEntity, diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index a73f7f76e8..4e64c6c633 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -68,6 +68,42 @@ export interface WiringConfig { sandbox?: SpawnSandboxOption } ) => Promise<{ entityUrl: string; streamPath: string }> + /** + * Fork a top-level entity at the server-resolved latest completed + * run. Returns the new root entity's URL + main stream path. + * + * Optional `parent` makes the new fork a child of that URL; pair with + * `wake` to register a subscription at fork time. The `condition` + * here is the agents-server's normalized wake shape (`'runFinished'` + * or `{ on: 'change', ... }`) — callers above this layer (`doFork`) + * translate the user-facing `Wake` into this form, the same way + * createOrGetChild does for spawn. + */ + forkEntity: ( + sourceEntityUrl: string, + opts?: { + /** + * Caller-supplied instance id for the new fork (wired to the + * server's `instance_id` body field). Omit to let the server + * mint one (currently `-fork-`). + */ + instanceId?: string + /** + * Parent URL for the new fork. When set, the fork becomes a + * child of this URL and the wake registration's subscriberUrl + * is derived from it (matching the spawn route's contract). + */ + parent?: string + /** + * User-facing Wake; the wiring impl normalizes it into the + * wakeRegistry-compatible shape before sending to the server + * — same translation `createOrGetChild` does for spawn. + */ + wake?: Wake + initialMessage?: unknown + tags?: Record + } + ) => Promise<{ entityUrl: string; streamPath: string }> /** Create a child StreamDB, preload it, and register it for cleanup. */ createChildDb: ( streamUrl: string, @@ -131,6 +167,26 @@ export interface SetupContextResult { observe?: boolean } ) => Promise + /** + * Fork a source entity at its latest completed run. Returns an + * EntityHandle for the new fork — same shape spawn returns — so the + * caller can `await fork.run` etc. The fork is created as a CHILD + * of this setup-context's entity (parent = entityUrl) unless + * `observe: false`. Mirrors spawn's flow: builds the handle, wires a + * spawn handle on wakeSession to resolve the run promise on + * runFinished, then calls `wiring.forkEntity` to do the server-side + * fork. + */ + fork: ( + sourceEntityUrl: string, + id: string, + opts?: { + initialMessage?: unknown + wake?: Wake + tags?: Record + observe?: boolean + } + ) => Promise send: ( entityUrl: string, payload: unknown, @@ -1200,6 +1256,232 @@ export function createSetupContext( return handle }, + async fork( + sourceEntityUrl: string, + id: string, + opts?: { + initialMessage?: unknown + wake?: Wake + tags?: Record + observe?: boolean + } + ): Promise { + const observeChild = opts?.observe !== false + // The fork's type is the source's type — fork is a copy, not a + // new entity type. Parse it from the source URL. + const parsedSourceUrl = sourceEntityUrl + .split(`/`) + .filter((segment) => segment.length > 0) + const type = parsedSourceUrl[0] ?? `unknown` + const childKey = manifestChildKey(type, id) + const childRow = (entityUrl: string): ManifestChildEntry => ({ + kind: `child`, + key: childKey, + id, + entity_type: type, + entity_url: entityUrl, + observed: true, + ...(opts?.wake ? { wake: opts.wake } : {}), + }) + + let runResolve: () => void + let forkError: Error | null = null + let runPromise = new Promise((resolve) => { + runResolve = resolve + }) + + // The server constructs the fork URL as `//` when + // `instance_id` is supplied (current code path), so we know it up + // front. Same property spawn relies on for its handle URL. + let realEntityUrl = `/${type}/${id}` + + const handle: EntityHandle = { + sourceType: `entity`, + get sourceRef() { + return realEntityUrl + }, + get entityUrl() { + return realEntityUrl + }, + type, + db: null as unknown as EntityStreamDB, + events: [], + get run(): Promise { + if (inSetup) { + throw new Error( + `fork.run cannot be called during setup() — use it in effects instead` + ) + } + if (!observeChild) { + return Promise.reject( + new Error( + `fork.run is unavailable — fork(..., { observe: false }) opted out of child observation` + ) + ) + } + if (forkError) { + return Promise.reject(forkError) + } + return runPromise + }, + async text(): Promise> { + if (!observeChild) { + throw new Error( + `fork.text is unavailable — fork(..., { observe: false }) opted out of child observation` + ) + } + await this.run + return readCompletedRunTexts(this.db) + }, + send: (msg: unknown) => { + if (inSetup) { + throw new Error( + `fork.send() cannot be called during setup() — use it in effects instead` + ) + } + if (forkError) { + throw forkError + } + const result = dispatchSend({ + targetUrl: realEntityUrl, + payload: msg, + }) + runPromise = new Promise((resolve) => { + runResolve = resolve + }) + return result + }, + status: () => { + const entries = db.collections.childStatus?.toArray as + | Array + | undefined + return entries?.find( + (e) => + e.entity_url === realEntityUrl || + e.entity_url === `/${type}/${id}` + ) + }, + } + + // ---- Inline wiring (production path) ---- + if (wiring) { + // Mirror spawn's order: register the spawn handle first so the + // runFinished wake can find it on the back-edge; then create + // the server-side fork. + wakeSession.registerSpawnHandle(id, { + wireDb: () => {}, + resolveRun: () => { + runResolve!() + }, + rejectRun: (reason: Error) => { + forkError = reason + }, + updateEntityUrl: (newUrl: string) => { + realEntityUrl = newUrl + if (observeChild) { + wakeSession.registerManifestEntry(childRow(newUrl)) + } + }, + }) + + try { + // For an observed (default) fork, register a wake on the + // new fork firing back to this entity. Default to + // `runFinished + includeResponse` when the caller didn't + // pass a wake — matches spawn's default child-observation + // shape. The wiring impl translates user-facing `Wake` into + // the wakeRegistry's `{ subscriberUrl, condition, ... }`. + const wakeForFork: Wake | undefined = observeChild + ? (opts?.wake ?? { + on: `runFinished`, + includeResponse: true, + }) + : undefined + const { entityUrl: forkUrl, streamPath } = await wiring.forkEntity( + sourceEntityUrl, + { + instanceId: id, + ...(observeChild && { parent: entityUrl }), + ...(wakeForFork && { wake: wakeForFork }), + ...(opts?.initialMessage !== undefined && { + initialMessage: opts.initialMessage, + }), + ...(opts?.tags && { tags: opts.tags }), + } + ) + realEntityUrl = forkUrl + if (observeChild) { + wakeSession.registerManifestEntry(childRow(forkUrl)) + + const childDb = await wiring.createChildDb( + `${config.serverBaseUrl}${streamPath}`, + type, + (event) => { + if ( + event.type === `run` && + event.headers.operation === `update` + ) { + const val = event.value as { status?: string } | undefined + if (val?.status === `completed` || val?.status === `failed`) { + runResolve!() + } + } + } + ) + handle.db = childDb + + const runs = childDb.collections.runs?.toArray as + | Array<{ key: string; status: string }> + | undefined + if (runs) { + const latestRun = runs[runs.length - 1] + if ( + latestRun && + (latestRun.status === `completed` || + latestRun.status === `failed`) + ) { + runResolve!() + } + } + } + } catch (err) { + forkError = err instanceof Error ? err : new Error(String(err)) + throw forkError + } + + observeHandleCache.set(realEntityUrl, handle) + + return handle + } + + // ---- Deferred wiring (unit test path) ---- same shape as spawn. + const dbReady = new Promise((resolveDb, rejectDb) => { + wakeSession.registerSpawnHandle(id, { + wireDb: (childDb: EntityStreamDBWithActions) => { + handle.db = childDb + resolveDb() + }, + resolveRun: () => { + runResolve!() + }, + rejectRun: (reason: Error) => { + forkError = reason + rejectDb(reason) + }, + updateEntityUrl: (newUrl: string) => { + realEntityUrl = newUrl + wakeSession.registerManifestEntry(childRow(newUrl)) + }, + }) + }) + + wakeSession.registerManifestEntry(childRow(realEntityUrl)) + + await dbReady + + return handle + }, + send( targetUrl: string, payload: unknown, diff --git a/packages/agents-runtime/src/tools/send.ts b/packages/agents-runtime/src/tools/send.ts index 2a58b5961e..09d7cdedd5 100644 --- a/packages/agents-runtime/src/tools/send.ts +++ b/packages/agents-runtime/src/tools/send.ts @@ -45,7 +45,7 @@ export function createSendTool( }) ), payload: Type.Any({ - description: `Message payload to deliver to the target entity.`, + description: `Message payload to deliver to the target entity. For chat-rendered targets (Horton sessions, your own forks), use the shape \`{ text: "..." }\` so the message body shows up in the chat UI — that's the same shape the user's chat input produces. A plain string also works. Other shapes are delivered as-is but won't render as a chat bubble body.`, }), type: Type.Optional( Type.String({ description: `Optional message type.` }) diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 7aaa5180a7..b49bb67695 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -829,6 +829,28 @@ export type Wake = timeoutMs?: number } +/** + * Options bag for `ctx.fork` / `ctx.forkSelf`. Mirrors `ctx.spawn`'s + * opts shape where the semantics map. + */ +export interface ForkOptions { + /** Inbox message delivered to the new fork atomically with creation. */ + initialMessage?: unknown + /** + * Wake subscription registered on the new fork. Defaults to + * `{ on: 'runFinished', includeResponse: true }` when omitted. + */ + wake?: Wake + /** Tags stamped on the new fork in addition to those copied from the source. */ + tags?: Record + /** + * `false` opts out of the parent relationship entirely (no parent + * link, no wake registration, no manifest entry). Fire-and-forget. + * Mirrors `spawn`'s `observe: false`. Defaults to `true`. + */ + observe?: boolean +} + export type WakeMessage = Omit export type WakeEvent = { @@ -987,6 +1009,46 @@ export interface HandlerContext< sandbox?: SpawnSandboxOption } ) => Promise + /** + * Fork a session at the latest completed run on its `main` stream. + * The new fork is created as a CHILD of this entity (same parent- + * ownership model as `spawn`), and a `runFinished + includeResponse` + * wake is registered on it at fork time. Reply delivery uses the + * parent's manifest-anchored wake — the same mechanism `spawn` uses + * — so when the fork's next run finishes, this entity wakes with the + * response in the wake message. + * + * Positional shape matches `ctx.spawn(type, id, args?, opts?)`: + * `ctx.fork(sourceEntityUrl, id, opts?)` + * + * - `sourceEntityUrl` — the entity whose history the fork inherits. + * Use `ctx.forkSelf(...)` to fork yourself (omits this arg). + * - `id` — the new fork's instance id (the `` in `/horton/`). + * Required for the same reason `spawn`'s id is required: the caller + * knows the new URL up front and the request is idempotent on retry + * (same id → server deduplicates). Tool wrappers that target the + * model layer generate this via `nanoid` — see `createForkTool`. + * + * Options mirror `spawn` where the semantics map: + * - `initialMessage` is delivered to the fork's inbox atomically + * with creation, folding the fork+send pattern into one call. + * - `wake` overrides the default `runFinished + includeResponse` + * subscription (e.g. to set debounce for high-fanout forking). + * - `tags` are stamped on top of the tags copied from the source. + * - `observe: false` opts out of the parent relationship entirely: + * no wake, no manifest entry, no reply path — fire-and-forget. + */ + fork: ( + sourceEntityUrl: string, + id: string, + opts?: ForkOptions + ) => Promise + /** + * Convenience wrapper for the common self-fork case — equivalent to + * `ctx.fork(ctx.entityUrl, id, opts)`. Mirrors `spawn`'s ergonomics + * (where you also don't have to spell out who the parent is). + */ + forkSelf: (id: string, opts?: ForkOptions) => Promise observe: (( source: ObservationSource & { sourceType: `entity` }, opts?: { wake?: Wake } diff --git a/packages/agents-runtime/test/context-factory.test.ts b/packages/agents-runtime/test/context-factory.test.ts index 166ecb96b0..da09faaddc 100644 --- a/packages/agents-runtime/test/context-factory.test.ts +++ b/packages/agents-runtime/test/context-factory.test.ts @@ -97,6 +97,7 @@ describe(`createHandlerContext`, () => { }, doObserve: vi.fn(), doSpawn: vi.fn(), + doFork: vi.fn(), doMkdb: vi.fn(), executeSend: vi.fn(), tags: {}, @@ -223,6 +224,7 @@ describe(`createHandlerContext`, () => { hydratedEventSourceWake, doObserve: vi.fn(), doSpawn: vi.fn(), + doFork: vi.fn(), doMkdb: vi.fn(), executeSend: vi.fn(), tags: {}, diff --git a/packages/agents-runtime/test/helpers/context-test-helpers.ts b/packages/agents-runtime/test/helpers/context-test-helpers.ts index 4b26ede32d..19a88bd2f8 100644 --- a/packages/agents-runtime/test/helpers/context-test-helpers.ts +++ b/packages/agents-runtime/test/helpers/context-test-helpers.ts @@ -335,6 +335,7 @@ export function createTestHandlerContext( hydratedEventSourceWake: opts.hydratedEventSourceWake, doObserve: vi.fn(), doSpawn: vi.fn(), + doFork: vi.fn(), doMkdb: vi.fn(), executeSend: vi.fn(), doSetTag: vi.fn(async () => undefined), diff --git a/packages/agents-runtime/test/record-run.test.ts b/packages/agents-runtime/test/record-run.test.ts index 40424522ad..927188da93 100644 --- a/packages/agents-runtime/test/record-run.test.ts +++ b/packages/agents-runtime/test/record-run.test.ts @@ -89,6 +89,7 @@ function buildHarness(opts?: { existingRunKeys?: Array }): { }, doObserve: vi.fn(), doSpawn: vi.fn(), + doFork: vi.fn(), doMkdb: vi.fn(), executeSend: vi.fn(), tags: {}, diff --git a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts index 3bb93b08d2..89e3839644 100644 --- a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts +++ b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts @@ -270,6 +270,7 @@ describe(`createHandlerContext: tags + tag mutations`, () => { wakeEvent: { type: `inbox`, payload: `hi` } as any, doObserve: () => Promise.resolve({} as any), doSpawn: () => Promise.resolve({} as any), + doFork: () => Promise.resolve({} as any), doMkdb: () => ({}) as any, executeSend: async () => ({ sent: true, targetUrl: `/horton/x` }), doSetTag: async (key, value) => { diff --git a/packages/agents-runtime/test/setup-context.test.ts b/packages/agents-runtime/test/setup-context.test.ts index 73a07ae134..a95e538d2f 100644 --- a/packages/agents-runtime/test/setup-context.test.ts +++ b/packages/agents-runtime/test/setup-context.test.ts @@ -2198,6 +2198,7 @@ describe(`entity patterns`, () => { customStateNames: [], wiring: { createOrGetChild: vi.fn(), + forkEntity: vi.fn(), createChildDb, createSourceDb: vi.fn(), createSharedStateDb: vi.fn(), diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index 7593b6e44a..968991f6df 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -75,11 +75,20 @@ function renderRowKey(row: RenderTimelineRow): string { } function readInboxText(payload: unknown): string { + if (typeof payload === `string`) return payload if (payload && typeof payload === `object`) { - const text = (payload as { text?: unknown }).text - if (typeof text === `string`) return text + // Prefer the canonical `text` key (what the chat input emits and what + // the `send` tool's description recommends), then fall back to + // `message` / `content` since agents sometimes emit those when the + // shape guidance isn't internalised. Keeps casually-shaped agent-to- + // agent sends visible in the chat instead of rendering as a blank. + const candidates = [`text`, `message`, `content`] as const + for (const key of candidates) { + const value = (payload as Record)[key] + if (typeof value === `string`) return value + } } - return typeof payload === `string` ? payload : `` + return `` } function stringifySearchPayload(value: unknown): string { diff --git a/packages/agents-server-ui/src/lib/sendMessage.ts b/packages/agents-server-ui/src/lib/sendMessage.ts index eee1cec177..f84de05530 100644 --- a/packages/agents-server-ui/src/lib/sendMessage.ts +++ b/packages/agents-server-ui/src/lib/sendMessage.ts @@ -292,11 +292,15 @@ export async function sendEntityMessage({ } export function readTextPayload(payload: unknown): string { + if (typeof payload === `string`) return payload if (payload && typeof payload === `object`) { - const text = (payload as { text?: unknown }).text - if (typeof text === `string`) return text + const candidates = [`text`, `message`, `content`] as const + for (const key of candidates) { + const value = (payload as Record)[key] + if (typeof value === `string`) return value + } } - return typeof payload === `string` ? payload : `` + return `` } function principalUrl(principalKey: string): string { diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index 478601f8d9..ec4b2f1ae0 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -173,6 +173,38 @@ type ForkSubtreeOptions = { * regardless. */ forkPointer?: EventPointer + /** + * Named server-resolved anchor. Resolves to a concrete `forkPointer` + * by reading the source root's `main` history. Mutually exclusive with + * `forkPointer`. Use this when the caller doesn't have access to the + * source's per-row pointer side-table (e.g. an agent forking a session + * via a tool). + * + * - `latest_completed_run` — the most recent `runs` row on the source + * with `status === 'completed'`. Errors if no such row exists. This + * is the eligibility rule the per-row "Fork from here" UI uses. + */ + anchor?: `latest_completed_run` + /** + * Optional parent URL for the new root fork entity. When set, the + * new fork is a CHILD of this URL — its `parent` field is overridden + * (rather than inherited from the source, which is `null` for the + * only allowed source: a top-level entity). Pairs with `wake` to give + * the parent reply-delivery the same way spawn does. + */ + parent?: string + /** + * Optional wake subscription registered on the new root fork at fork + * time. Same shape and semantics as `spawn`'s `wake` (see + * `TypedSpawnRequest.wake`). Typically set by the agent `fork` tool + * to wire reply delivery via the parent's manifest-anchored wake. + */ + wake?: TypedSpawnRequest[`wake`] + /** + * Optional tags stamped onto the new root fork entity in addition + * to those copied from the source. Mirrors `spawn`'s `tags`. + */ + tags?: Record } type ForkEntityPlan = { @@ -905,6 +937,13 @@ export class EntityManager { const writeEntityLocks = new Set() const writeStreamLocks = new Set() + // Anchor-forks resolve to a concrete pointer server-side by reading the + // source's `main` history. Below this point the resolved pointer (or + // an explicit one from `opts.forkPointer`) drives the same code path — + // anchor handling collapses entirely into the pointer-fork flow. + const usePointerPath = + opts.forkPointer !== undefined || opts.anchor !== undefined + try { // For pointer-forks we read the source root HISTORICALLY at a // frozen offset, so concurrent activity on the root past the @@ -916,7 +955,7 @@ export class EntityManager { // those are HEAD-cloned and need a stable snapshot. For HEAD-forks // the old all-idle requirement still applies. let sourceTree: Array - if (opts.forkPointer) { + if (usePointerPath) { const rootEntity = await this.registry.getEntity(rootUrl) if (!rootEntity) { throw new ElectricAgentsError( @@ -945,11 +984,11 @@ export class EntityManager { ) } - // When forking at a pointer, pre-read the root's main, validate the - // pointer against the source's true history, and materialise the - // root-at-pointer snapshot fragments. The pointer only applies to - // the root's `main` stream. Descendants kept by the manifest filter - // are forked at HEAD. + // When forking at a pointer (or anchor), pre-read the root's main, + // validate the pointer against the source's true history, and + // materialise the root-at-pointer snapshot fragments. The pointer + // only applies to the root's `main` stream. Descendants kept by + // the manifest filter are forked at HEAD. // // Pointer→position translation: the runtime mints pointers as // `{ offset: previousBatchOffset, subOffset: itemIndex+1 }`, where @@ -968,16 +1007,32 @@ export class EntityManager { sharedStateIds: Set } | undefined - if (opts.forkPointer) { + let effectiveForkPointer: EventPointer | undefined = opts.forkPointer + if (usePointerPath) { const sourceEvents = await this.streamClient.readJson< Record >(sourceRoot.streams.main) const flat = sourceEvents.flatMap((item) => Array.isArray(item) ? item : [item] ) as Array> + if (!effectiveForkPointer && opts.anchor === `latest_completed_run`) { + effectiveForkPointer = this.resolveLatestCompletedRunPointer( + flat, + sourceRoot.streams.main + ) + } + if (!effectiveForkPointer) { + // Defensive — would only fire on a future anchor variant we + // forget to handle above. + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Internal: pointer-path fork with no resolved pointer`, + 500 + ) + } const target = this.resolveForkPointerTarget( flat, - opts.forkPointer, + effectiveForkPointer, sourceRoot.streams.main ) const filteredEvents = flat.slice(0, target) @@ -1009,7 +1064,7 @@ export class EntityManager { // subtree except the root) are HEAD-cloned, so they must be idle // before we read their snapshots. Wait+lock only those — the root // was skipped above. - if (opts.forkPointer) { + if (usePointerPath) { const descendants = effectiveSubtree.filter( (entity) => entity.url !== sourceRoot.url ) @@ -1061,6 +1116,28 @@ export class EntityManager { stringMap ) + // Override fields on the new root fork's plan from caller opts. + // Plumbed through from `forkBodySchema` — descendants in + // `effectiveSubtree` keep what `buildForkEntityPlans` copied. + // + // - `parent`: child-fork relationship (vs the default inheritance + // from the source, which is `null` for a top-level source). + // - `tags`: merged on top of source tags (caller-supplied wins). + const rootPlanForOverrides = entityPlans.find( + (plan) => plan.source.url === rootUrl + ) + if (rootPlanForOverrides) { + if (opts.parent !== undefined) { + rootPlanForOverrides.fork.parent = opts.parent + } + if (opts.tags !== undefined) { + rootPlanForOverrides.fork.tags = { + ...(rootPlanForOverrides.fork.tags ?? {}), + ...opts.tags, + } + } + } + this.addForkLocks( this.forkWriteLockedEntities, effectiveSubtree.map((entity) => entity.url), @@ -1085,8 +1162,8 @@ export class EntityManager { await this.streamClient.fork( plan.fork.streams.main, plan.source.streams.main, - isRoot && opts.forkPointer - ? { forkPointer: opts.forkPointer } + isRoot && effectiveForkPointer + ? { forkPointer: effectiveForkPointer } : undefined ) createdStreams.push(plan.fork.streams.main) @@ -1148,6 +1225,30 @@ export class EntityManager { createdEntities.push(plan.fork.url) } + // Register a wake subscription on the new root fork when the + // caller asked for one. Mirrors the spawn flow's wake handling + // (entity-manager.spawnInner). Rollback below already calls + // `wakeRegistry.unregisterBySource(forkUrl)` for every created + // entity, so this cleans up automatically on failure. + if (opts.wake !== undefined) { + const rootPlan = entityPlans.find( + (plan) => plan.source.url === rootUrl + ) + if (rootPlan) { + await this.wakeRegistry.register({ + tenantId: this.tenantId, + subscriberUrl: opts.wake.subscriberUrl, + sourceUrl: rootPlan.fork.url, + condition: opts.wake.condition, + debounceMs: opts.wake.debounceMs, + timeoutMs: opts.wake.timeoutMs, + oneShot: false, + includeResponse: opts.wake.includeResponse, + manifestKey: opts.wake.manifestKey, + }) + } + } + for (const plan of entityPlans) { const manifests = activeManifestsByEntity.get(plan.fork.url) ?? new Map() @@ -1463,6 +1564,58 @@ export class EntityManager { return positionAtAnchor + pointer.subOffset } + /** + * Find an `EventPointer` that addresses the most recent `runs` row on + * the source's `main` stream with `status === 'completed'`. Mirrors the + * eligibility rule the per-row "Fork from here" UI applies — only + * completed runs are valid fork anchors; `started`/`failed` rows are + * skipped. + * + * The pointer is computed in the same coordinate system the runtime + * mints pointers in (see entity-stream-db's onBeforeBatch): for a row + * R in log entry E, the pointer is `{ offset: P, subOffset: K }` where + * `P` is the END offset of the log entry preceding E (or `null` for + * stream start) and `K` is R's 1-indexed position within E. + */ + private resolveLatestCompletedRunPointer( + events: ReadonlyArray>, + streamPath: string + ): EventPointer { + let priorEntryOffset: string | null = null + let currentEntryOffset: string | null = null + let positionInEntry = 0 + let latest: EventPointer | null = null + + for (const event of events) { + const headers = isRecord(event.headers) ? event.headers : undefined + const eventOffset = + typeof headers?.offset === `string` ? headers.offset : null + + if (eventOffset !== currentEntryOffset) { + priorEntryOffset = currentEntryOffset + currentEntryOffset = eventOffset + positionInEntry = 0 + } + positionInEntry++ + + if (event.type !== `run`) continue + if (headers?.operation === `delete`) continue + if (!isRecord(event.value)) continue + if (event.value.status !== `completed`) continue + + latest = { offset: priorEntryOffset, subOffset: positionInEntry } + } + + if (!latest) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Source ${streamPath} has no completed run to fork from`, + 400 + ) + } + return latest + } + /** * Compute the subset of `sourceTree` that survives the manifest filter * applied at the root. After filtering the root's manifest at the fork diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index 71a70ae297..3e0de98246 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -195,6 +195,41 @@ const forkBodySchema = Type.Object({ sub_offset: Type.Number(), }) ), + // Named server-resolved anchor. Resolves to a concrete fork pointer on + // the source root's `main` server-side, so callers don't need access + // to the source's per-row pointer side-table. Mutually exclusive with + // `fork_pointer`. + anchor: Type.Optional(Type.Literal(`latest_completed_run`)), + // Optional parent URL. When set, the new root fork is a CHILD of this + // URL (rather than inheriting the source's parent, which is `null` + // for the only allowed source — a top-level entity). Used by the + // agent `fork` tool to make a forking entity own its forks the same + // way a spawning entity owns its workers. + parent: Type.Optional(Type.String()), + // Optional wake subscription registered on the new root fork at fork + // time. Mirrors the `wake` field on spawn — the subscriber URL gets + // woken when the fork meets the named condition, with the response + // optionally inlined. Used to wire fork-as-child reply delivery + // through the same manifest-anchored mechanism spawn uses. + wake: Type.Optional( + Type.Object({ + subscriberUrl: Type.String(), + condition: wakeConditionSchema, + debounceMs: Type.Optional(Type.Number()), + timeoutMs: Type.Optional(Type.Number()), + includeResponse: Type.Optional(Type.Boolean()), + manifestKey: Type.Optional(Type.String()), + }) + ), + // Optional initial inbox message delivered to the new root fork + // immediately after creation. Atomic with the fork RPC — folds the + // common "fork then send" pattern into a single round-trip and means + // a partial failure can't leave an idle fork on the parent's + // manifest. Mirrors spawn's `initialMessage` field. + initialMessage: Type.Optional(Type.Unknown()), + // Optional tags stamped on the new root fork entity in addition to + // those copied from the source. Mirrors spawn's `tags`. + tags: Type.Optional(stringRecordSchema), }) const setTagBodySchema = Type.Object({ @@ -1064,8 +1099,57 @@ async function forkEntity( if (principalMutationError) return principalMutationError const parsed = routeBody(request) + if (parsed.fork_pointer && parsed.anchor) { + return apiError( + 400, + ErrCodeInvalidRequest, + `fork_pointer and anchor are mutually exclusive` + ) + } const { entityUrl, entity } = requireExistingEntityRoute(request) await assertDispatchPolicyAllowed(ctx, entity.dispatch_policy) + + // Validate `parent` and `wake.subscriberUrl` before forking — mirrors + // the spawn route's parent-validation flow. Without these checks, a + // direct HTTP caller could attach a fork under an arbitrary parent or + // register a wake firing to an arbitrary subscriber. + if (parsed.parent !== undefined) { + const parent = await ctx.entityManager.registry.getEntity(parsed.parent) + if (!parent) { + return apiError(404, ErrCodeNotFound, `Parent entity not found`) + } + if (!(await canAccessEntity(ctx, parent, `spawn`, request as Request))) { + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to spawn children from ${parent.url}` + ) + } + } + if (parsed.wake !== undefined) { + // The only sensible target for a fork's wake is the new fork's + // parent (so the parent gets woken when the fork's run finishes + // — same model as spawn). Require parent + matching subscriber to + // prevent a caller from registering a wake firing to an entity + // they don't own. If subscriber-flexibility ever becomes a real + // use case, replace this with a proper canAccessEntity check on + // the subscriber. + if (parsed.parent === undefined) { + return apiError( + 400, + ErrCodeInvalidRequest, + `wake requires parent (the fork's wake fires to its parent)` + ) + } + if (parsed.wake.subscriberUrl !== parsed.parent) { + return apiError( + 401, + ErrCodeUnauthorized, + `wake.subscriberUrl must match parent` + ) + } + } + const result = await ctx.entityManager.forkSubtree(entityUrl, { rootInstanceId: parsed.instance_id, waitTimeoutMs: parsed.waitTimeoutMs, @@ -1075,10 +1159,24 @@ async function forkEntity( subOffset: parsed.fork_pointer.sub_offset, }, }), + ...(parsed.anchor && { anchor: parsed.anchor }), + ...(parsed.parent !== undefined && { parent: parsed.parent }), + ...(parsed.wake !== undefined && { wake: parsed.wake }), + ...(parsed.tags !== undefined && { tags: parsed.tags }), }) for (const forkedEntity of result.entities) { await linkEntityDispatchSubscription(ctx, forkedEntity) } + // Deliver the initial message via entityManager.send AFTER the + // dispatch subscription is linked — same ordering spawn uses. Sending + // before linking would land the inbox row on the stream before the + // dispatcher is subscribed, and the dispatcher would never pick it up. + if (parsed.initialMessage !== undefined) { + await ctx.entityManager.send(result.root.url, { + from: parsed.parent ?? ctx.principal.url, + payload: parsed.initialMessage, + }) + } return json( { root: toPublicEntity(result.root), diff --git a/packages/agents-server/test/dispatch-policy-routing.test.ts b/packages/agents-server/test/dispatch-policy-routing.test.ts index 21792b7c5b..0aab3f4fd1 100644 --- a/packages/agents-server/test/dispatch-policy-routing.test.ts +++ b/packages/agents-server/test/dispatch-policy-routing.test.ts @@ -43,11 +43,16 @@ function buildContext(overrides: Partial = {}): TenantContext { } return { service: `tenant-test`, + // dev-local built-in system principal bypasses permission checks + // (these tests assert dispatch wiring, not authz; permission + // enforcement landed on main after this test was last touched). + // The runner mock below has its owner_principal set to the same + // url so assertDispatchPolicyAllowed's owner check passes too. principal: { - kind: `user`, - id: `owner@example.com`, - key: `user:owner@example.com`, - url: `/principal/user%3Aowner%40example.com`, + kind: `system`, + id: `dev-local`, + key: `system:dev-local`, + url: `/principal/system:dev-local`, }, publicUrl: `http://server`, durableStreamsUrl: `http://durable.local`, @@ -72,7 +77,7 @@ function buildContext(overrides: Partial = {}): TenantContext { ), getRunner: vi.fn(async () => ({ id: `runner-1`, - owner_principal: `/principal/user%3Aowner%40example.com`, + owner_principal: `/principal/system:dev-local`, label: `Local runner`, kind: `local`, admin_status: `enabled`, @@ -212,7 +217,7 @@ describe(`dispatch policy routing`, () => { }) ) expect(ctx.entityManager.send).toHaveBeenCalledWith(`/chat/one`, { - from: `/principal/user%3Aowner%40example.com`, + from: `/principal/system:dev-local`, payload: `hello`, }) expect(ctx.streamClient.putSubscription).toHaveBeenCalledWith( @@ -252,7 +257,7 @@ describe(`dispatch policy routing`, () => { }) ) expect(ctx.entityManager.send).toHaveBeenCalledWith(`/chat/one`, { - from: `/principal/user%3Aowner%40example.com`, + from: `/principal/system:dev-local`, payload: `hello`, }) expect( diff --git a/packages/agents-server/test/electric-agents-routes.test.ts b/packages/agents-server/test/electric-agents-routes.test.ts index 608ab648a3..eaafd9092f 100644 --- a/packages/agents-server/test/electric-agents-routes.test.ts +++ b/packages/agents-server/test/electric-agents-routes.test.ts @@ -997,4 +997,173 @@ describe(`ElectricAgentsRoutes fork endpoint`, () => { expect(payload.root).not.toHaveProperty(`write_token`) expect(payload.root).not.toHaveProperty(`subscription_id`) }) + + it(`forwards anchor, parent, wake, initialMessage, and tags through to forkSubtree (and sends the initial message)`, async () => { + const forkedRoot = { + url: `/chat/root-copy`, + type: `chat`, + status: `idle`, + streams: { + main: `/chat/root-copy/main`, + error: `/chat/root-copy/error`, + }, + subscription_id: `chat-handler`, + write_token: `secret-token`, + tags: { experiment: `ecosystem-maturity` }, + spawn_args: {}, + created_at: 1, + updated_at: 1, + } + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/root` }), + getEntityType: vi.fn(), + }, + forkSubtree: vi.fn().mockResolvedValue({ + root: forkedRoot, + entities: [forkedRoot], + }), + send: vi.fn().mockResolvedValue(undefined), + } as any + + const wake = { + subscriberUrl: `/chat/parent`, + condition: `runFinished` as const, + includeResponse: true, + } + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + parent: `/chat/parent`, + wake, + initialMessage: { text: `hello fork` }, + tags: { experiment: `ecosystem-maturity` }, + } + ) + + expect(response.status).toBe(201) + expect(manager.forkSubtree).toHaveBeenCalledWith(`/chat/root`, { + rootInstanceId: undefined, + waitTimeoutMs: undefined, + anchor: `latest_completed_run`, + parent: `/chat/parent`, + wake, + tags: { experiment: `ecosystem-maturity` }, + }) + // initialMessage is NOT passed into forkSubtree — it's delivered + // via entityManager.send after linkEntityDispatchSubscription, the + // same ordering spawn uses. Verify the send happened against the + // new root fork with the parent as `from`. + expect(manager.send).toHaveBeenCalledWith(`/chat/root-copy`, { + from: `/chat/parent`, + payload: { text: `hello fork` }, + }) + }) + + it(`rejects when fork_pointer and anchor are both present`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/root` }), + getEntityType: vi.fn(), + }, + forkSubtree: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + fork_pointer: { offset: `abc`, sub_offset: 1 }, + } + ) + + expect(response.status).toBe(400) + expect(manager.forkSubtree).not.toHaveBeenCalled() + }) + + it(`rejects when parent is set but does not exist`, async () => { + // getEntity returns the source for the source-route lookup but + // null for the parent lookup. Differentiate by URL. + const getEntity = vi.fn(async (url: string) => + url === `/chat/root` ? { url: `/chat/root` } : null + ) + const manager = { + registry: { getEntity, getEntityType: vi.fn() }, + forkSubtree: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + parent: `/chat/missing-parent`, + } + ) + + expect(response.status).toBe(404) + expect(manager.forkSubtree).not.toHaveBeenCalled() + }) + + it(`rejects when wake is set without parent`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/root` }), + getEntityType: vi.fn(), + }, + forkSubtree: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + wake: { + subscriberUrl: `/chat/stranger`, + condition: `runFinished`, + includeResponse: true, + }, + } + ) + + expect(response.status).toBe(400) + expect(manager.forkSubtree).not.toHaveBeenCalled() + }) + + it(`rejects when wake.subscriberUrl does not match parent`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/root` }), + getEntityType: vi.fn(), + }, + forkSubtree: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + parent: `/chat/parent`, + wake: { + subscriberUrl: `/chat/stranger`, + condition: `runFinished`, + includeResponse: true, + }, + } + ) + + expect(response.status).toBe(401) + expect(manager.forkSubtree).not.toHaveBeenCalled() + }) }) diff --git a/packages/agents-server/test/electric-agents-status.test.ts b/packages/agents-server/test/electric-agents-status.test.ts index 37ddf60c2f..8cdc19a181 100644 --- a/packages/agents-server/test/electric-agents-status.test.ts +++ b/packages/agents-server/test/electric-agents-status.test.ts @@ -688,4 +688,264 @@ describe(`ElectricAgentsManager.forkSubtree`, () => { expect.any(Uint8Array) ) }) + + // Helper for the anchor / wake-rollback tests: builds a streamClient + // backed by a caller-supplied event list, captures every `fork` call's + // opts so the test can assert the resolved pointer, and tracks + // appends + deletes for rollback assertions. + function makeStreamClient(events: Array>) { + const forkCalls: Array<{ + forkPath: string + sourcePath: string + opts?: Record + }> = [] + const deleted: Array = [] + return { + forkCalls, + deleted, + client: { + readJson: vi.fn().mockResolvedValue(events), + exists: vi.fn().mockResolvedValue(false), + fork: vi.fn( + async ( + forkPath: string, + sourcePath: string, + opts?: Record + ) => { + forkCalls.push({ forkPath, sourcePath, opts }) + } + ), + append: vi.fn().mockResolvedValue({ offset: `1` }), + delete: vi.fn(async (path: string) => { + deleted.push(path) + }), + }, + } + } + + it(`anchor 'latest_completed_run' resolves to a fork pointer at the latest completed run`, async () => { + const root = makeEntity(`/chat/anchor-src`) + // Stream layout (one event per log entry, offsets monotonically + // increasing). The runtime mints pointers as + // { offset: , subOffset: <1-based position> } + // So the latest completed run is at offset "D" (run-0 update) and + // the expected pointer is { offset: "C", subOffset: 1 } — "C" is + // the entry-offset of the row before the target (the started-run + // row), and subOffset is 1 since it's the only row in entry "D". + const events = [ + { + type: `entity_created`, + key: `entity-created`, + headers: { operation: `insert`, offset: `A` }, + value: {}, + }, + { + type: `inbox`, + key: `msg-1`, + headers: { operation: `insert`, offset: `B` }, + value: { from: `user`, payload: `hello` }, + }, + { + type: `run`, + key: `run-0`, + headers: { operation: `insert`, offset: `C` }, + value: { status: `started` }, + }, + { + type: `run`, + key: `run-0`, + headers: { operation: `update`, offset: `D` }, + value: { status: `completed` }, + }, + // Subsequent activity past the latest-completed anchor — the + // resolver should still point at the row at offset "D". + { + type: `inbox`, + key: `msg-2`, + headers: { operation: `insert`, offset: `E` }, + value: { from: `user`, payload: `again` }, + }, + { + type: `run`, + key: `run-1`, + headers: { operation: `insert`, offset: `F` }, + value: { status: `started` }, + }, + ] + const { forkCalls, client } = makeStreamClient(events) + const entitiesByUrl = new Map([[root.url, root]]) + + const manager = new EntityManager({ + registry: { + getEntity: vi.fn(async (url: string) => entitiesByUrl.get(url) ?? null), + listEntities: vi.fn().mockResolvedValue({ entities: [], total: 0 }), + createEntity: vi.fn(async (entity: any) => { + entitiesByUrl.set(entity.url, entity) + return 1 + }), + deleteEntity: vi.fn(), + replaceEntityManifestSource: vi.fn(), + replaceSharedStateLink: vi.fn(), + } as any, + streamClient: client as any, + validator: {} as any, + wakeRegistry: { + register: vi.fn(), + unregisterBySubscriber: vi.fn(), + unregisterBySource: vi.fn(), + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + }) + + await manager.forkSubtree(root.url, { + rootInstanceId: `anchor-fork`, + anchor: `latest_completed_run`, + waitTimeoutMs: 0, + }) + + const rootForkCall = forkCalls.find( + (call) => call.sourcePath === root.streams.main + ) + expect(rootForkCall).toBeDefined() + expect(rootForkCall?.opts).toEqual({ + forkPointer: { offset: `C`, subOffset: 1 }, + }) + }) + + it(`anchor 'latest_completed_run' rejects when the source has no completed run`, async () => { + const root = makeEntity(`/chat/no-completion`) + const events = [ + { + type: `entity_created`, + key: `entity-created`, + headers: { operation: `insert`, offset: `A` }, + value: {}, + }, + { + type: `inbox`, + key: `msg-1`, + headers: { operation: `insert`, offset: `B` }, + value: { from: `user`, payload: `hi` }, + }, + // Only `started` — no `completed` row anywhere. + { + type: `run`, + key: `run-0`, + headers: { operation: `insert`, offset: `C` }, + value: { status: `started` }, + }, + ] + const { client } = makeStreamClient(events) + const entitiesByUrl = new Map([[root.url, root]]) + + const manager = new EntityManager({ + registry: { + getEntity: vi.fn(async (url: string) => entitiesByUrl.get(url) ?? null), + listEntities: vi.fn().mockResolvedValue({ entities: [], total: 0 }), + createEntity: vi.fn(), + deleteEntity: vi.fn(), + replaceEntityManifestSource: vi.fn(), + replaceSharedStateLink: vi.fn(), + } as any, + streamClient: client as any, + validator: {} as any, + wakeRegistry: { + register: vi.fn(), + unregisterBySubscriber: vi.fn(), + unregisterBySource: vi.fn(), + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + }) + + await expect( + manager.forkSubtree(root.url, { + rootInstanceId: `no-anchor-fork`, + anchor: `latest_completed_run`, + waitTimeoutMs: 0, + }) + ).rejects.toMatchObject({ + status: 400, + message: expect.stringContaining(`no completed run`), + }) + }) + + it(`rolls back the fork when wake registration fails`, async () => { + const root = makeEntity(`/chat/rollback-src`) + const events = [ + { + type: `entity_created`, + key: `entity-created`, + headers: { operation: `insert`, offset: `A` }, + value: {}, + }, + { + type: `run`, + key: `run-0`, + headers: { operation: `insert`, offset: `B` }, + value: { status: `started` }, + }, + { + type: `run`, + key: `run-0`, + headers: { operation: `update`, offset: `C` }, + value: { status: `completed` }, + }, + ] + const { client, deleted } = makeStreamClient(events) + const entitiesByUrl = new Map([[root.url, root]]) + const deletedEntities: Array = [] + + const manager = new EntityManager({ + registry: { + getEntity: vi.fn(async (url: string) => entitiesByUrl.get(url) ?? null), + listEntities: vi.fn().mockResolvedValue({ entities: [], total: 0 }), + createEntity: vi.fn(async (entity: any) => { + entitiesByUrl.set(entity.url, entity) + return 1 + }), + deleteEntity: vi.fn(async (url: string) => { + deletedEntities.push(url) + entitiesByUrl.delete(url) + }), + replaceEntityManifestSource: vi.fn(), + replaceSharedStateLink: vi.fn(), + } as any, + streamClient: client as any, + validator: {} as any, + wakeRegistry: { + register: vi.fn().mockRejectedValue(new Error(`wake register boom`)), + unregisterBySubscriber: vi.fn(), + unregisterBySource: vi.fn(), + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + }) + + await expect( + manager.forkSubtree(root.url, { + rootInstanceId: `rollback-fork`, + anchor: `latest_completed_run`, + waitTimeoutMs: 0, + parent: `/chat/parent`, + wake: { + subscriberUrl: `/chat/parent`, + condition: `runFinished`, + includeResponse: true, + }, + }) + ).rejects.toThrow(`wake register boom`) + + // The new root fork entity was created then deleted; both of its + // streams were forked then deleted; the source root is untouched. + expect(deletedEntities).toContain(`/chat/rollback-fork`) + expect(deleted).toEqual( + expect.arrayContaining([ + `/chat/rollback-fork/main`, + `/chat/rollback-fork/error`, + ]) + ) + expect(deletedEntities).not.toContain(root.url) + }) }) diff --git a/packages/agents-server/test/event-source-subscriptions-route.test.ts b/packages/agents-server/test/event-source-subscriptions-route.test.ts index d89899fda1..7a477f90f3 100644 --- a/packages/agents-server/test/event-source-subscriptions-route.test.ts +++ b/packages/agents-server/test/event-source-subscriptions-route.test.ts @@ -178,11 +178,16 @@ function tenantContext( } return { service: `svc-agent-1`, + // dev-local is a built-in bypass principal (permissions.ts / + // isBuiltInSystemPrincipalUrl). These tests assert subscription + // routing, not authz — and permission enforcement landed on main + // after they were written, so the registry mocks here don't have + // the entity-permission methods spawned. Bypass is the minimal fix. principal: { kind: `system`, - id: `test`, - key: `system:test`, - url: `/principal/system%3Atest`, + id: `dev-local`, + key: `system:dev-local`, + url: `/principal/system:dev-local`, }, publicUrl: `http://agents.test`, durableStreamsUrl: `http://streams.test/v1/stream/svc-agent-1`, diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index 1fb06d0421..2a4d8070cd 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -4,6 +4,7 @@ import { serverLog } from '../log' import { createHortonDocsSupport } from '../docs/knowledge-base' import { createSkillTools } from '@electric-ax/agents-runtime' import { createSpawnWorkerTool } from '../tools/spawn-worker' +import { createForkTool } from '../tools/fork' import { modelInputSchemaDefs, modelChoiceValues, @@ -239,6 +240,7 @@ When a user opens with a greeting ("hi", "hello", "hey", etc.) or a broad statem - web_search: search the web - fetch_url: fetch and convert a URL to markdown - spawn_worker: dispatch a subagent for an isolated task +- fork: spawn a child session that inherits this conversation's history up to the latest completed response. Same parent-ownership model as spawn_worker — when the fork's next run finishes, you'll wake with its response. - send: send a message to an Electric Agent/entity. To schedule future work for yourself, call send with self: true and afterMs. ${eventSourceTools}${docsTools}${skillsTools} @@ -267,6 +269,20 @@ When you spawn a worker, write its system prompt the way you'd brief a colleague After spawning, end your turn (optionally with a brief "I've dispatched a worker for X; I'll respond when it finishes"). When the worker finishes, you'll receive a message describing which worker completed and what it returned. Multiple workers may finish at different times — check the message for the worker URL to know which one you're hearing about. +# When to fork (vs spawn_worker) +\`fork\` is the **sibling primitive to spawn_worker** — both create a child you own, both report back to you on a future wake. The difference is only in what the child boots with: + +- **spawn_worker** → child boots with an **empty context**; you brief it from scratch via a system prompt + initial message. Use when the worker doesn't need to know what we've said so far. +- **fork** → child boots with **a copy of THIS conversation's history** up to your latest completed response. Use when each child needs to know what we've already established (the user's framing, your prior analysis, an earlier decision, the constraints, etc.). + +**Trigger pattern: prefer fork when generating multiple variants the user wants to compare.** If the user asks for "three different X" or "two takes on Y" or "evaluate these N approaches" and each variant should reflect the conversation we've had so far, **don't inline the variants in one response** — fork once per variant, send each a tailored follow-up, and synthesize when they report back. Inlining feels faster but the variants end up cross-contaminating in your single response; forks keep them honestly independent. The exception is trivial generation (a list of names, a couple of one-liners) where each variant takes a sentence — there, inline is fine. + +Workflow when forking yourself for parallel exploration: +1. **End your current turn first.** The fork's history stops at your *latest completed* run. Anything you say mid-turn is NOT in the fork. If you want your analysis baked into each fork, finish it and end the turn before calling fork. +2. On the next wake, call \`fork\` once per branch with a different \`initialMessage\` per call — that's how the branches diverge from a shared starting point. Each fork is YOUR child, just like a spawned worker, and the \`initialMessage\` is delivered to it atomically with creation, so the fork starts running immediately (no follow-up \`send\` needed). Use the shape \`{ text: "..." }\` for the message so it renders in the chat UI. +3. End your turn. You'll wake automatically when each fork's run finishes (same wake mechanism as spawn_worker); the wake message identifies the fork and includes its response. +4. If you're waiting on multiple forks, don't synthesize on the first wake — quietly end the turn with "got N of M, waiting" until you have what you need to compare. + # Reporting Report outcomes faithfully. If a command failed, say so with the relevant output. If you didn't run a verification step, say that rather than implying you did. Don't hedge confirmed results with unnecessary disclaimers. @@ -308,6 +324,7 @@ export function createHortonTools( ] : [createFetchUrlTool(sandbox)]), createSpawnWorkerTool(ctx, opts.modelConfig), + createForkTool(ctx), createSendTool(ctx.send, { selfEntityUrl: ctx.entityUrl }), ...(opts.docsSearchTool ? [opts.docsSearchTool] : []), ] diff --git a/packages/agents/src/index.ts b/packages/agents/src/index.ts index 095c9f33b2..4c80b1537c 100644 --- a/packages/agents/src/index.ts +++ b/packages/agents/src/index.ts @@ -59,5 +59,6 @@ export { createSpawnWorkerTool, } from './tools/spawn-worker.js' export type { WorkerToolName } from './tools/spawn-worker.js' +export { createForkTool } from './tools/fork.js' export { createHortonDocsSupport } from './docs/knowledge-base.js' export { braveSearchTool } from '@electric-ax/agents-runtime/tools' diff --git a/packages/agents/src/tools/fork.ts b/packages/agents/src/tools/fork.ts new file mode 100644 index 0000000000..c1d29dacaa --- /dev/null +++ b/packages/agents/src/tools/fork.ts @@ -0,0 +1,93 @@ +import { Type } from '@sinclair/typebox' +import { nanoid } from 'nanoid' +import { serverLog } from '../log' +import type { AgentTool } from '@mariozechner/pi-agent-core' +import type { HandlerContext } from '@electric-ax/agents-runtime' + +export function createForkTool(ctx: HandlerContext): AgentTool { + return { + name: `fork`, + label: `Fork`, + description: `Fork a session at its latest completed agent response, producing a child copy of the conversation up to that point. The new fork is YOUR child — same parent-ownership model as a spawned worker — and it reports back to you the same way: when its next run finishes you'll be woken with its response. End your turn after forking. + +Prefer supplying an 'initialMessage' so the fork is dispatched immediately in a single call — no follow-up 'send' needed. If you omit it, the fork boots idle and you'll need to call 'send' afterwards. For chat-rendered messages use the shape \`{ "text": "..." }\` so the prompt shows up in the chat UI. + +Use this to explore multiple alternative continuations in parallel from the same starting point. End your current turn first so the fork includes your latest response — the anchor is always the most recently completed run. + +Omit 'entityUrl' to fork your own session. Pass a different session's URL to fork that session instead (the new fork is still your child). The optional 'id' names the new fork's instance — useful when you want stable, predictable URLs (e.g. labelling branches in a parallel exploration); omit to let the server mint one.`, + parameters: Type.Object({ + entityUrl: Type.Optional( + Type.String({ + description: `URL of the session to fork. Omit to fork your own session.`, + }) + ), + id: Type.Optional( + Type.String({ + description: `Instance id for the new fork (the \`\` in \`/horton/\`). Mirrors spawn_worker's id parameter. Omit to let the server assign one.`, + }) + ), + initialMessage: Type.Optional( + Type.Any({ + description: `Initial inbox message delivered to the fork atomically with creation — the fork wakes and starts running immediately. Use the shape \`{ "text": "..." }\` for chat-rendered prompts. Omit to leave the fork idle (then call 'send' separately).`, + }) + ), + tags: Type.Optional( + Type.Record(Type.String(), Type.String(), { + description: `Optional tags stamped on the new fork, on top of those copied from the source. Useful for labelling experiments (e.g. \`{ "experiment": "ecosystem-maturity" }\`).`, + }) + ), + }), + execute: async (_toolCallId, params) => { + const { entityUrl, id, initialMessage, tags } = params as { + entityUrl?: string + id?: string + initialMessage?: unknown + tags?: Record + } + try { + const opts = { + ...(initialMessage !== undefined && { initialMessage }), + ...(tags !== undefined && { tags }), + } + // The library API (`ctx.fork` / `ctx.forkSelf`) requires an id + // — same shape as `ctx.spawn(type, id, ...)`. The model layer + // doesn't need to know this; we generate one via nanoid when + // it's not supplied (same pattern `createSpawnWorkerTool` uses + // for the worker's id). + const forkId = id ?? `fork-${nanoid(10)}` + const handle = + entityUrl !== undefined + ? await ctx.fork(entityUrl, forkId, opts) + : await ctx.forkSelf(forkId, opts) + const dispatchNote = + initialMessage !== undefined + ? `The initial message has been delivered to the fork — it will start running.` + : `The fork boots idle — use the 'send' tool to dispatch a follow-up prompt.` + return { + content: [ + { + type: `text` as const, + text: `Forked at ${handle.entityUrl}. ${dispatchNote} End your turn; you'll wake with the fork's response when its next run finishes (same as a spawned worker).`, + }, + ], + details: { forked: true, forkUrl: handle.entityUrl }, + } + } catch (err) { + const message = err instanceof Error ? err.message : `Unknown error` + serverLog.warn( + `[fork tool] failed to fork ${entityUrl ?? ``}: ${message}`, + err instanceof Error ? err : undefined + ) + return { + content: [ + { + type: `text` as const, + text: `Error forking session: ${message}`, + }, + ], + details: { forked: false }, + } + } + }, + } +}