From 2db00291e9b15ff39e261812aec58114fcbc01c6 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 3 Jun 2026 11:48:38 +0200 Subject: [PATCH 01/10] feat(agents): add a `fork` tool so Horton can branch the current session `POST /_electric/entities///fork` gains an optional `anchor: 'latest_completed_run'` body field as an alternative to `fork_pointer`. When present, the server scans the source root's `main` history, finds the last `runs` row with `status === 'completed'`, derives the matching `{ offset, sub_offset }` pointer, and runs the existing pointer-fork path with it. Mutually exclusive with `fork_pointer`; 400 if no completed run exists. This lets callers without access to the source's per-row pointer side-table (e.g. an agent forking via a tool) fork at the same anchor the per-row "Fork from here" UI uses. `HandlerContext` gains `fork(targetEntityUrl?, opts?)`. Defaults `targetEntityUrl` to `ctx.entityUrl` for self-fork. Auto-observes the new fork with `runFinished` + `includeResponse` so the caller wakes when the fork's next run finishes; opt out with `observe: false`. Wired through a new `RuntimeServerClient.forkEntity` method and a new `WiringConfig.forkEntity` injection point alongside `createOrGetChild`. Horton gets a `fork` tool (optional `entityUrl` parameter) that delegates to `ctx.fork`, plus a "When to fork (vs spawn_worker)" section in the system prompt framing the distinction: spawn for an isolated subtask with an empty context, fork for parallel exploration that needs the conversation's full history. Includes the end-turn-first / send-different-prompts / wait-for-all-responses workflow for parallel-exploration patterns. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/fork-tool-horton.md | 5 + .changeset/fork-tool-runtime.md | 5 + .changeset/fork-tool-server.md | 6 + .../agents-runtime/src/context-factory.ts | 10 ++ packages/agents-runtime/src/process-wake.ts | 29 ++++- .../src/runtime-server-client.ts | 31 +++++ packages/agents-runtime/src/setup-context.ts | 7 ++ packages/agents-runtime/src/types.ts | 14 +++ .../test/context-factory.test.ts | 2 + .../test/helpers/context-test-helpers.ts | 1 + .../agents-runtime/test/record-run.test.ts | 1 + ...time-server-client-update-metadata.test.ts | 1 + .../agents-runtime/test/setup-context.test.ts | 1 + packages/agents-server/src/entity-manager.ts | 109 ++++++++++++++++-- .../src/routing/entities-router.ts | 13 +++ packages/agents/src/agents/horton.ts | 17 +++ packages/agents/src/index.ts | 1 + packages/agents/src/tools/fork.ts | 53 +++++++++ 18 files changed, 294 insertions(+), 12 deletions(-) create mode 100644 .changeset/fork-tool-horton.md create mode 100644 .changeset/fork-tool-runtime.md create mode 100644 .changeset/fork-tool-server.md create mode 100644 packages/agents/src/tools/fork.ts diff --git a/.changeset/fork-tool-horton.md b/.changeset/fork-tool-horton.md new file mode 100644 index 0000000000..5c93cca162 --- /dev/null +++ b/.changeset/fork-tool-horton.md @@ -0,0 +1,5 @@ +--- +'@electric-ax/agents': patch +--- + +Give Horton a `fork` tool so an agent can create a sibling session that inherits the current conversation's history up to its latest completed response. The tool takes an optional `entityUrl` (omit for self-fork) and delegates to `ctx.fork`, which auto-observes the new fork with `runFinished` + `includeResponse` so the caller wakes when the fork's next run finishes. Horton's system prompt grows a "When to fork (vs spawn_worker)" section framing the distinction — spawn for isolated subtasks with empty context, fork for parallel exploration that needs the conversation's full history — plus the end-turn-first / send-different-prompts / wait-for-all-responses workflow for parallel-exploration patterns (analyze, fork N times, synthesise the winner). diff --git a/.changeset/fork-tool-runtime.md b/.changeset/fork-tool-runtime.md new file mode 100644 index 0000000000..73d0a204d2 --- /dev/null +++ b/.changeset/fork-tool-runtime.md @@ -0,0 +1,5 @@ +--- +'@electric-ax/agents-runtime': patch +--- + +Add `ctx.fork(targetEntityUrl?, opts?)` to `HandlerContext`. Calls the agents-server fork endpoint with `anchor: 'latest_completed_run'` to create a sibling session that inherits the source's history up to the most recent completed run. Defaults `targetEntityUrl` to `ctx.entityUrl` (self-fork). Auto-observes the new fork with `wake: { on: 'runFinished', includeResponse: true }` so the caller wakes when the fork's next run finishes; pass `observe: false` for fire-and-forget. Wired through `RuntimeServerClient.forkEntity` and a new `WiringConfig.forkEntity` injection point alongside `createOrGetChild`. diff --git a/.changeset/fork-tool-server.md b/.changeset/fork-tool-server.md new file mode 100644 index 0000000000..e014fa5c2d --- /dev/null +++ b/.changeset/fork-tool-server.md @@ -0,0 +1,6 @@ +--- +'@electric-ax/agents-server': patch +'@electric-ax/agents-server-ui': patch +--- + +Add a server-resolved fork anchor for `POST /_electric/entities///fork`. The fork body now accepts an optional `anchor: 'latest_completed_run'` field as an alternative to `fork_pointer`: the server scans the source root's `main` history, finds the most recent `runs` row with `status === 'completed'`, derives the matching `{ offset, sub_offset }` pointer, and runs the existing pointer-fork path with it. Mutually exclusive with `fork_pointer` (400 if both); errors with 400 if the source has no completed run. Lets callers without access to the source's per-row pointer side-table (e.g. an agent forking a session via a tool) still fork at the same anchor the per-row "Fork from here" UI uses. diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index d0c8354fd0..98412984f9 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -102,6 +102,10 @@ export interface HandlerContextConfig { observe?: boolean } ) => Promise + doFork: ( + targetEntityUrl: string, + opts?: { observe?: boolean } + ) => Promise<{ url: string }> doMkdb: ( id: string, schema: TSchema @@ -758,6 +762,12 @@ export function createHandlerContext( ): Promise { return config.doSpawn(type, id, args, opts) }, + fork( + targetEntityUrl?: string, + opts?: { observe?: boolean } + ): Promise<{ url: string }> { + return config.doFork(targetEntityUrl ?? config.entityUrl, opts) + }, mkdb( id: string, schema: TSchema diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index 63b0c3abb7..a180f83b23 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -17,7 +17,7 @@ import { buildHydratedEventSourceWake, eventSourceWakeInfoFromManifests, } from './event-sources' -import { webhookObservationCollections } from './observation-sources' +import { entity, webhookObservationCollections } from './observation-sources' import type { HydratedEventSourceWake } from './event-sources' import { SandboxError } from './sandbox/types' import type { Sandbox } from './sandbox/types' @@ -1328,6 +1328,13 @@ export async function processWake( }) }, + forkEntity: async ( + sourceEntityUrl: string + ): Promise<{ entityUrl: string; streamPath: string }> => { + const result = await serverClient.forkEntity({ sourceEntityUrl }) + return { entityUrl: result.entityUrl, streamPath: result.streamPath } + }, + createChildDb: async ( childStreamUrl: string, childTypeName?: string, @@ -1749,6 +1756,25 @@ export async function processWake( return setupCtx.spawn(type, id, spawnArgs, opts) } + const doFork = async ( + targetEntityUrl: string, + opts?: { observe?: boolean } + ): Promise<{ url: string }> => { + const { entityUrl: forkUrl } = + await wiringConfig.forkEntity(targetEntityUrl) + // Auto-observe by default: register a runFinished wake on the new + // fork so the caller wakes when the fork's next run completes. + // Mirrors `spawn_worker`'s default wake. Opt out with + // `observe: false` for fire-and-forget forks. + if (opts?.observe !== false) { + await doObserve(entity(forkUrl), { + on: `runFinished`, + includeResponse: true, + }) + } + return { url: forkUrl } + } + const doMkdb = ( id: string, schema: TSchema @@ -2039,6 +2065,7 @@ export async function processWake( hydratedEventSourceWake: await hydrateCurrentEventSourceWake(), doObserve, doSpawn, + doFork, doMkdb, doCreateAttachment: (attachment) => serverClient diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index 3142059414..0a5cd86b5c 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -121,6 +121,14 @@ export interface RuntimeServerClient { id: string }) => Promise spawnEntity: (options: SpawnEntityOptions) => Promise + /** + * Fork an entity at the server-resolved `latest_completed_run` anchor. + * Resolves to the new root entity's info. Wraps the agents-server + * `POST /_electric/entities///fork` endpoint. + */ + forkEntity: (options: { + sourceEntityUrl: string + }) => Promise getEntity: (entityUrl: string) => Promise ensureSharedStateStream: ( sharedStateId: string, @@ -449,6 +457,28 @@ export function createRuntimeServerClient( return entityInfo } + const forkEntity = async ({ + sourceEntityUrl, + }: { + sourceEntityUrl: string + }): Promise => { + const response = await request(entityRpcPath(sourceEntityUrl, `/fork`), { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ anchor: `latest_completed_run` }), + }) + if (!response.ok) { + throw new Error( + `fork ${sourceEntityUrl} failed (${response.status}): ${await readErrorText(response)}` + ) + } + const payload = (await response.json()) as { root?: RuntimeEntityResponse } + return requireEntityInfo( + payload.root, + `fork ${sourceEntityUrl} returned an invalid root payload` + ) + } + const ensureSharedStateStream = async ( sharedStateId: string, ownerEntityUrl?: string @@ -778,6 +808,7 @@ export function createRuntimeServerClient( createAttachment, readAttachment, spawnEntity, + forkEntity, getEntity, ensureSharedStateStream, signalEntity, diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index a73f7f76e8..bf95edcf29 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -68,6 +68,13 @@ export interface WiringConfig { sandbox?: SpawnSandboxOption } ) => Promise<{ entityUrl: string; streamPath: string }> + /** + * Fork a top-level entity at the server-resolved latest completed + * run. Returns the new root entity's URL + main stream path. + */ + forkEntity: ( + sourceEntityUrl: string + ) => Promise<{ entityUrl: string; streamPath: string }> /** Create a child StreamDB, preload it, and register it for cleanup. */ createChildDb: ( streamUrl: string, diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 7aaa5180a7..40960bb1bb 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -987,6 +987,20 @@ export interface HandlerContext< sandbox?: SpawnSandboxOption } ) => Promise + /** + * Fork a session at the latest completed run on its `main` stream. + * Defaults to this entity (self-fork) when `targetEntityUrl` is + * omitted. The fork is a sibling — not a child — of the source. + * + * `observe: true` (the default) registers an observation on the new + * fork wired to wake this entity on the fork's next `runFinished`. + * Pass `observe: false` for fire-and-forget forks where the caller + * never plans to react to the fork's completion. + */ + fork: ( + targetEntityUrl?: string, + opts?: { observe?: boolean } + ) => Promise<{ url: string }> observe: (( source: ObservationSource & { sourceType: `entity` }, opts?: { wake?: Wake } diff --git a/packages/agents-runtime/test/context-factory.test.ts b/packages/agents-runtime/test/context-factory.test.ts index 166ecb96b0..da09faaddc 100644 --- a/packages/agents-runtime/test/context-factory.test.ts +++ b/packages/agents-runtime/test/context-factory.test.ts @@ -97,6 +97,7 @@ describe(`createHandlerContext`, () => { }, doObserve: vi.fn(), doSpawn: vi.fn(), + doFork: vi.fn(), doMkdb: vi.fn(), executeSend: vi.fn(), tags: {}, @@ -223,6 +224,7 @@ describe(`createHandlerContext`, () => { hydratedEventSourceWake, doObserve: vi.fn(), doSpawn: vi.fn(), + doFork: vi.fn(), doMkdb: vi.fn(), executeSend: vi.fn(), tags: {}, diff --git a/packages/agents-runtime/test/helpers/context-test-helpers.ts b/packages/agents-runtime/test/helpers/context-test-helpers.ts index 4b26ede32d..19a88bd2f8 100644 --- a/packages/agents-runtime/test/helpers/context-test-helpers.ts +++ b/packages/agents-runtime/test/helpers/context-test-helpers.ts @@ -335,6 +335,7 @@ export function createTestHandlerContext( hydratedEventSourceWake: opts.hydratedEventSourceWake, doObserve: vi.fn(), doSpawn: vi.fn(), + doFork: vi.fn(), doMkdb: vi.fn(), executeSend: vi.fn(), doSetTag: vi.fn(async () => undefined), diff --git a/packages/agents-runtime/test/record-run.test.ts b/packages/agents-runtime/test/record-run.test.ts index 40424522ad..927188da93 100644 --- a/packages/agents-runtime/test/record-run.test.ts +++ b/packages/agents-runtime/test/record-run.test.ts @@ -89,6 +89,7 @@ function buildHarness(opts?: { existingRunKeys?: Array }): { }, doObserve: vi.fn(), doSpawn: vi.fn(), + doFork: vi.fn(), doMkdb: vi.fn(), executeSend: vi.fn(), tags: {}, diff --git a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts index 3bb93b08d2..2361a47db9 100644 --- a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts +++ b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts @@ -270,6 +270,7 @@ describe(`createHandlerContext: tags + tag mutations`, () => { wakeEvent: { type: `inbox`, payload: `hi` } as any, doObserve: () => Promise.resolve({} as any), doSpawn: () => Promise.resolve({} as any), + doFork: () => Promise.resolve({ url: `/horton/fork` }), doMkdb: () => ({}) as any, executeSend: async () => ({ sent: true, targetUrl: `/horton/x` }), doSetTag: async (key, value) => { diff --git a/packages/agents-runtime/test/setup-context.test.ts b/packages/agents-runtime/test/setup-context.test.ts index 73a07ae134..a95e538d2f 100644 --- a/packages/agents-runtime/test/setup-context.test.ts +++ b/packages/agents-runtime/test/setup-context.test.ts @@ -2198,6 +2198,7 @@ describe(`entity patterns`, () => { customStateNames: [], wiring: { createOrGetChild: vi.fn(), + forkEntity: vi.fn(), createChildDb, createSourceDb: vi.fn(), createSharedStateDb: vi.fn(), diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index 478601f8d9..d46d0cf734 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -173,6 +173,18 @@ type ForkSubtreeOptions = { * regardless. */ forkPointer?: EventPointer + /** + * Named server-resolved anchor. Resolves to a concrete `forkPointer` + * by reading the source root's `main` history. Mutually exclusive with + * `forkPointer`. Use this when the caller doesn't have access to the + * source's per-row pointer side-table (e.g. an agent forking a session + * via a tool). + * + * - `latest_completed_run` — the most recent `runs` row on the source + * with `status === 'completed'`. Errors if no such row exists. This + * is the eligibility rule the per-row "Fork from here" UI uses. + */ + anchor?: `latest_completed_run` } type ForkEntityPlan = { @@ -905,6 +917,13 @@ export class EntityManager { const writeEntityLocks = new Set() const writeStreamLocks = new Set() + // Anchor-forks resolve to a concrete pointer server-side by reading the + // source's `main` history. Below this point the resolved pointer (or + // an explicit one from `opts.forkPointer`) drives the same code path — + // anchor handling collapses entirely into the pointer-fork flow. + const usePointerPath = + opts.forkPointer !== undefined || opts.anchor !== undefined + try { // For pointer-forks we read the source root HISTORICALLY at a // frozen offset, so concurrent activity on the root past the @@ -916,7 +935,7 @@ export class EntityManager { // those are HEAD-cloned and need a stable snapshot. For HEAD-forks // the old all-idle requirement still applies. let sourceTree: Array - if (opts.forkPointer) { + if (usePointerPath) { const rootEntity = await this.registry.getEntity(rootUrl) if (!rootEntity) { throw new ElectricAgentsError( @@ -945,11 +964,11 @@ export class EntityManager { ) } - // When forking at a pointer, pre-read the root's main, validate the - // pointer against the source's true history, and materialise the - // root-at-pointer snapshot fragments. The pointer only applies to - // the root's `main` stream. Descendants kept by the manifest filter - // are forked at HEAD. + // When forking at a pointer (or anchor), pre-read the root's main, + // validate the pointer against the source's true history, and + // materialise the root-at-pointer snapshot fragments. The pointer + // only applies to the root's `main` stream. Descendants kept by + // the manifest filter are forked at HEAD. // // Pointer→position translation: the runtime mints pointers as // `{ offset: previousBatchOffset, subOffset: itemIndex+1 }`, where @@ -968,16 +987,32 @@ export class EntityManager { sharedStateIds: Set } | undefined - if (opts.forkPointer) { + let effectiveForkPointer: EventPointer | undefined = opts.forkPointer + if (usePointerPath) { const sourceEvents = await this.streamClient.readJson< Record >(sourceRoot.streams.main) const flat = sourceEvents.flatMap((item) => Array.isArray(item) ? item : [item] ) as Array> + if (!effectiveForkPointer && opts.anchor === `latest_completed_run`) { + effectiveForkPointer = this.resolveLatestCompletedRunPointer( + flat, + sourceRoot.streams.main + ) + } + if (!effectiveForkPointer) { + // Defensive — would only fire on a future anchor variant we + // forget to handle above. + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Internal: pointer-path fork with no resolved pointer`, + 500 + ) + } const target = this.resolveForkPointerTarget( flat, - opts.forkPointer, + effectiveForkPointer, sourceRoot.streams.main ) const filteredEvents = flat.slice(0, target) @@ -1009,7 +1044,7 @@ export class EntityManager { // subtree except the root) are HEAD-cloned, so they must be idle // before we read their snapshots. Wait+lock only those — the root // was skipped above. - if (opts.forkPointer) { + if (usePointerPath) { const descendants = effectiveSubtree.filter( (entity) => entity.url !== sourceRoot.url ) @@ -1085,8 +1120,8 @@ export class EntityManager { await this.streamClient.fork( plan.fork.streams.main, plan.source.streams.main, - isRoot && opts.forkPointer - ? { forkPointer: opts.forkPointer } + isRoot && effectiveForkPointer + ? { forkPointer: effectiveForkPointer } : undefined ) createdStreams.push(plan.fork.streams.main) @@ -1463,6 +1498,58 @@ export class EntityManager { return positionAtAnchor + pointer.subOffset } + /** + * Find an `EventPointer` that addresses the most recent `runs` row on + * the source's `main` stream with `status === 'completed'`. Mirrors the + * eligibility rule the per-row "Fork from here" UI applies — only + * completed runs are valid fork anchors; `started`/`failed` rows are + * skipped. + * + * The pointer is computed in the same coordinate system the runtime + * mints pointers in (see entity-stream-db's onBeforeBatch): for a row + * R in log entry E, the pointer is `{ offset: P, subOffset: K }` where + * `P` is the END offset of the log entry preceding E (or `null` for + * stream start) and `K` is R's 1-indexed position within E. + */ + private resolveLatestCompletedRunPointer( + events: ReadonlyArray>, + streamPath: string + ): EventPointer { + let priorEntryOffset: string | null = null + let currentEntryOffset: string | null = null + let positionInEntry = 0 + let latest: EventPointer | null = null + + for (const event of events) { + const headers = isRecord(event.headers) ? event.headers : undefined + const eventOffset = + typeof headers?.offset === `string` ? headers.offset : null + + if (eventOffset !== currentEntryOffset) { + priorEntryOffset = currentEntryOffset + currentEntryOffset = eventOffset + positionInEntry = 0 + } + positionInEntry++ + + if (event.type !== `run`) continue + if (headers?.operation === `delete`) continue + if (!isRecord(event.value)) continue + if (event.value.status !== `completed`) continue + + latest = { offset: priorEntryOffset, subOffset: positionInEntry } + } + + if (!latest) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Source ${streamPath} has no completed run to fork from`, + 400 + ) + } + return latest + } + /** * Compute the subset of `sourceTree` that survives the manifest filter * applied at the root. After filtering the root's manifest at the fork diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index 71a70ae297..0f190e4916 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -195,6 +195,11 @@ const forkBodySchema = Type.Object({ sub_offset: Type.Number(), }) ), + // Named server-resolved anchor. Resolves to a concrete fork pointer on + // the source root's `main` server-side, so callers don't need access + // to the source's per-row pointer side-table. Mutually exclusive with + // `fork_pointer`. + anchor: Type.Optional(Type.Literal(`latest_completed_run`)), }) const setTagBodySchema = Type.Object({ @@ -1064,6 +1069,13 @@ async function forkEntity( if (principalMutationError) return principalMutationError const parsed = routeBody(request) + if (parsed.fork_pointer && parsed.anchor) { + return apiError( + 400, + ErrCodeInvalidRequest, + `fork_pointer and anchor are mutually exclusive` + ) + } const { entityUrl, entity } = requireExistingEntityRoute(request) await assertDispatchPolicyAllowed(ctx, entity.dispatch_policy) const result = await ctx.entityManager.forkSubtree(entityUrl, { @@ -1075,6 +1087,7 @@ async function forkEntity( subOffset: parsed.fork_pointer.sub_offset, }, }), + ...(parsed.anchor && { anchor: parsed.anchor }), }) for (const forkedEntity of result.entities) { await linkEntityDispatchSubscription(ctx, forkedEntity) diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index 1fb06d0421..f8bc98749d 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -4,6 +4,7 @@ import { serverLog } from '../log' import { createHortonDocsSupport } from '../docs/knowledge-base' import { createSkillTools } from '@electric-ax/agents-runtime' import { createSpawnWorkerTool } from '../tools/spawn-worker' +import { createForkTool } from '../tools/fork' import { modelInputSchemaDefs, modelChoiceValues, @@ -239,6 +240,7 @@ When a user opens with a greeting ("hi", "hello", "hey", etc.) or a broad statem - web_search: search the web - fetch_url: fetch and convert a URL to markdown - spawn_worker: dispatch a subagent for an isolated task +- fork: create a sibling session that inherits this conversation's history up to the latest completed response. Auto-observes — you'll wake when the fork's next run finishes. - send: send a message to an Electric Agent/entity. To schedule future work for yourself, call send with self: true and afterMs. ${eventSourceTools}${docsTools}${skillsTools} @@ -267,6 +269,20 @@ When you spawn a worker, write its system prompt the way you'd brief a colleague After spawning, end your turn (optionally with a brief "I've dispatched a worker for X; I'll respond when it finishes"). When the worker finishes, you'll receive a message describing which worker completed and what it returned. Multiple workers may finish at different times — check the message for the worker URL to know which one you're hearing about. +# When to fork (vs spawn_worker) +The two tools solve different problems: +- **spawn_worker** creates a NEW subagent with an EMPTY context. You brief it with a system prompt and an initial message from scratch. Use it to delegate an isolated subtask where the worker doesn't need our conversation history. +- **fork** creates a sibling SESSION that inherits THIS conversation's full history up to your latest completed response. Use it to explore multiple alternative continuations of the same conversation in parallel — when each branch needs to "know what we already said," not just a brief. + +Typical fork use cases: trying two or three different answers to the same question and comparing, A/B-testing an approach at a decision point, or any "what if I'd responded differently here" exploration. + +Workflow when forking yourself for parallel exploration: +1. **End your current turn first.** The fork's history stops at your *latest completed* run. Anything you say mid-turn is NOT in the fork. If you want your analysis baked into each fork, finish it and end the turn before calling fork. +2. On the next wake, call \`fork\` once per branch you want to explore (or pass an \`entityUrl\` to fork a different session). +3. For each fork URL the tool returned, use \`send\` to deliver a *different* follow-up prompt — that's how the branches diverge from a shared starting point. +4. End your turn. You'll wake automatically when each fork's run finishes; the wake message includes the fork's response. +5. If you're waiting on multiple forks, don't synthesize on the first wake — quietly end the turn with something like "got N of M, waiting" until you have what you need to compare. + # Reporting Report outcomes faithfully. If a command failed, say so with the relevant output. If you didn't run a verification step, say that rather than implying you did. Don't hedge confirmed results with unnecessary disclaimers. @@ -308,6 +324,7 @@ export function createHortonTools( ] : [createFetchUrlTool(sandbox)]), createSpawnWorkerTool(ctx, opts.modelConfig), + createForkTool(ctx), createSendTool(ctx.send, { selfEntityUrl: ctx.entityUrl }), ...(opts.docsSearchTool ? [opts.docsSearchTool] : []), ] diff --git a/packages/agents/src/index.ts b/packages/agents/src/index.ts index 095c9f33b2..4c80b1537c 100644 --- a/packages/agents/src/index.ts +++ b/packages/agents/src/index.ts @@ -59,5 +59,6 @@ export { createSpawnWorkerTool, } from './tools/spawn-worker.js' export type { WorkerToolName } from './tools/spawn-worker.js' +export { createForkTool } from './tools/fork.js' export { createHortonDocsSupport } from './docs/knowledge-base.js' export { braveSearchTool } from '@electric-ax/agents-runtime/tools' diff --git a/packages/agents/src/tools/fork.ts b/packages/agents/src/tools/fork.ts new file mode 100644 index 0000000000..6925b89617 --- /dev/null +++ b/packages/agents/src/tools/fork.ts @@ -0,0 +1,53 @@ +import { Type } from '@sinclair/typebox' +import { serverLog } from '../log' +import type { AgentTool } from '@mariozechner/pi-agent-core' +import type { HandlerContext } from '@electric-ax/agents-runtime' + +export function createForkTool(ctx: HandlerContext): AgentTool { + return { + name: `fork`, + label: `Fork`, + description: `Fork a session at its latest completed agent response, producing a sibling copy of the conversation up to that point. The new session boots idle — use the existing 'send' tool to dispatch a follow-up prompt to it. The fork is auto-observed: when its next run finishes you will be woken with the response, so end your turn after forking. + +Use this to explore multiple alternative continuations in parallel from the same starting point. End your current turn first so the fork includes your latest response — the anchor is always the most recently completed run. + +Omit 'entityUrl' to fork your own session. Pass a different session's URL to fork that session instead.`, + parameters: Type.Object({ + entityUrl: Type.Optional( + Type.String({ + description: `URL of the session to fork. Omit to fork your own session.`, + }) + ), + }), + execute: async (_toolCallId, params) => { + const { entityUrl } = params as { entityUrl?: string } + try { + const { url } = await ctx.fork(entityUrl) + return { + content: [ + { + type: `text` as const, + text: `Forked at ${url}. The fork boots idle — use the 'send' tool to dispatch a follow-up prompt. End your turn afterwards; you'll wake when the fork's next run finishes.`, + }, + ], + details: { forked: true, forkUrl: url }, + } + } catch (err) { + const message = err instanceof Error ? err.message : `Unknown error` + serverLog.warn( + `[fork tool] failed to fork ${entityUrl ?? ``}: ${message}`, + err instanceof Error ? err : undefined + ) + return { + content: [ + { + type: `text` as const, + text: `Error forking session: ${message}`, + }, + ], + details: { forked: false }, + } + } + }, + } +} From cfc55fa8e753dd24c8b5223ca588b236a2a6602a Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 10:33:56 +0200 Subject: [PATCH 02/10] refactor(fork): make forks children of the forking entity, not siblings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change `fork` from creating a sibling (top-level) session to creating a child of the forking entity, mirroring the spawn primitive's parent-ownership model. Reply delivery now uses the same manifest-anchored wake path spawn uses: server registers a runFinished + includeResponse subscription on the new fork at fork time, runtime writes a `kind: 'child'` manifest row on the parent's stream, and the parent wakes with the fork's response inline when the fork's next run finishes. The previous out-of-band `ctx.observe` auto-call is gone. Wire shape: - `forkBodySchema` gains `parent` and `wake` (same shapes as `spawnBodySchema`); forkSubtreeInner overrides the new root fork's parent and registers the wake via `wakeRegistry.register`. - `RuntimeServerClient.forkEntity` and `WiringConfig.forkEntity` accept the new fields; `doFork` passes `parent: ctx.entityUrl` + `wake: { subscriberUrl, condition: 'runFinished', includeResponse: true }` and writes the manifest-child row via `wakeSession.registerManifestEntry`. - `ctx.fork` loses its `observe` option (no longer needed — the manifest-anchored wake is the delivery path). Horton's "When to fork (vs spawn_worker)" prompt section reframes fork as the sibling primitive to spawn_worker — both create a child the parent owns and gets replies from; the difference is only what the child boots with (empty context vs. inherited history). Adds an explicit trigger pattern ("prefer fork when generating multiple variants the user wants to compare; don't inline") to route the "give me three takes" shape to fork instead of collapsing it into one inline response (the regression observed on the first end-to-end test of the previous sibling-fork version). Also fixes a chat-render gap exposed by agent-to-agent sends: the `send` tool's `payload` description now spells out the canonical `{ text: "..." }` shape, and `readInboxText` falls back to `message` / `content` keys when `text` isn't present — so a fork that receives a follow-up prompt via `send` actually shows the prompt in the chat instead of rendering a blank bubble. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/fork-tool-horton.md | 4 +- .changeset/fork-tool-runtime.md | 6 +- .changeset/fork-tool-server.md | 7 +- .../agents-runtime/src/context-factory.ts | 12 +--- packages/agents-runtime/src/process-wake.ts | 69 ++++++++++++++----- .../src/runtime-server-client.ts | 33 ++++++++- packages/agents-runtime/src/setup-context.ts | 27 +++++++- packages/agents-runtime/src/tools/send.ts | 2 +- packages/agents-runtime/src/types.ts | 17 ++--- .../src/components/EntityTimeline.tsx | 15 +++- .../agents-server-ui/src/lib/sendMessage.ts | 10 ++- packages/agents-server/src/entity-manager.ts | 50 ++++++++++++++ .../src/routing/entities-router.ts | 23 +++++++ packages/agents/src/agents/horton.ts | 17 ++--- packages/agents/src/tools/fork.ts | 8 ++- 15 files changed, 241 insertions(+), 59 deletions(-) diff --git a/.changeset/fork-tool-horton.md b/.changeset/fork-tool-horton.md index 5c93cca162..7bf86a77c8 100644 --- a/.changeset/fork-tool-horton.md +++ b/.changeset/fork-tool-horton.md @@ -2,4 +2,6 @@ '@electric-ax/agents': patch --- -Give Horton a `fork` tool so an agent can create a sibling session that inherits the current conversation's history up to its latest completed response. The tool takes an optional `entityUrl` (omit for self-fork) and delegates to `ctx.fork`, which auto-observes the new fork with `runFinished` + `includeResponse` so the caller wakes when the fork's next run finishes. Horton's system prompt grows a "When to fork (vs spawn_worker)" section framing the distinction — spawn for isolated subtasks with empty context, fork for parallel exploration that needs the conversation's full history — plus the end-turn-first / send-different-prompts / wait-for-all-responses workflow for parallel-exploration patterns (analyze, fork N times, synthesise the winner). +Give Horton a `fork` tool that creates a child session inheriting this conversation's history up to the latest completed response. Takes an optional `entityUrl` (omit for self-fork) and delegates to `ctx.fork`, which makes the fork a CHILD of the calling entity (same parent-ownership model as `spawn_worker`) and wires reply delivery through the same manifest-anchored wake — when the fork's next run finishes, the parent wakes with the response in the wake message. + +Horton's system prompt grows a "When to fork (vs spawn_worker)" section framing the two tools as a pair: both create a child the parent owns and gets replies from, the difference is what the child boots with — `spawn_worker` starts with an empty context (you brief it from scratch), `fork` starts with a copy of the conversation up to the latest completed response. Includes an explicit trigger pattern ("prefer fork when generating multiple variants the user wants to compare; don't inline") to route "give me three takes" / "evaluate these N approaches" prompts to fork rather than collapsing them into one inline response, plus the end-turn-first / send-different-prompts / wait-for-all-responses workflow for the parallel-exploration loop. diff --git a/.changeset/fork-tool-runtime.md b/.changeset/fork-tool-runtime.md index 73d0a204d2..98a290658d 100644 --- a/.changeset/fork-tool-runtime.md +++ b/.changeset/fork-tool-runtime.md @@ -2,4 +2,8 @@ '@electric-ax/agents-runtime': patch --- -Add `ctx.fork(targetEntityUrl?, opts?)` to `HandlerContext`. Calls the agents-server fork endpoint with `anchor: 'latest_completed_run'` to create a sibling session that inherits the source's history up to the most recent completed run. Defaults `targetEntityUrl` to `ctx.entityUrl` (self-fork). Auto-observes the new fork with `wake: { on: 'runFinished', includeResponse: true }` so the caller wakes when the fork's next run finishes; pass `observe: false` for fire-and-forget. Wired through `RuntimeServerClient.forkEntity` and a new `WiringConfig.forkEntity` injection point alongside `createOrGetChild`. +Add `ctx.fork(targetEntityUrl?)` to `HandlerContext`. Calls the agents-server fork endpoint with `anchor: 'latest_completed_run'`, `parent: ctx.entityUrl`, and a `runFinished + includeResponse` wake — so the new fork is a CHILD of this entity (same parent-ownership model as `ctx.spawn`) and reports back via the same manifest-anchored wake mechanism `spawn` uses. Defaults `targetEntityUrl` to `ctx.entityUrl` for self-fork. + +Internally writes a `kind: 'child'` manifest row on the parent's `main` stream alongside the server-side wake registration, mirroring the spawn flow's bookkeeping so the relationship persists across wakes. Wired through new `parent` + `wake` fields on `RuntimeServerClient.forkEntity` and `WiringConfig.forkEntity`. + +The `send` tool's `payload` description now documents the canonical `{ text: "..." }` shape for chat-rendered targets (Horton sessions, agent forks) so messages emitted by `send` render as chat bubbles instead of blank bars. diff --git a/.changeset/fork-tool-server.md b/.changeset/fork-tool-server.md index e014fa5c2d..4fa08fd0ad 100644 --- a/.changeset/fork-tool-server.md +++ b/.changeset/fork-tool-server.md @@ -3,4 +3,9 @@ '@electric-ax/agents-server-ui': patch --- -Add a server-resolved fork anchor for `POST /_electric/entities///fork`. The fork body now accepts an optional `anchor: 'latest_completed_run'` field as an alternative to `fork_pointer`: the server scans the source root's `main` history, finds the most recent `runs` row with `status === 'completed'`, derives the matching `{ offset, sub_offset }` pointer, and runs the existing pointer-fork path with it. Mutually exclusive with `fork_pointer` (400 if both); errors with 400 if the source has no completed run. Lets callers without access to the source's per-row pointer side-table (e.g. an agent forking a session via a tool) still fork at the same anchor the per-row "Fork from here" UI uses. +Add server-resolved fork anchor + parent/wake fields to `POST /_electric/entities///fork`. + +- `anchor: 'latest_completed_run'` is an alternative to `fork_pointer`: the server scans the source root's `main` history, finds the most recent `runs` row with `status === 'completed'`, derives the matching `{ offset, sub_offset }` pointer, and runs the existing pointer-fork path with it. Mutually exclusive with `fork_pointer` (400 if both); 400 if no completed run exists. Lets callers without access to the source's per-row pointer side-table (e.g. an agent forking via a tool) fork at the same anchor the per-row "Fork from here" UI uses. +- `parent` and `wake` mirror the corresponding `spawn` body fields. When `parent` is set, the new root fork is a CHILD of that URL (rather than inheriting the source's parent). `wake` registers a subscription on the new root fork at fork time (same shape as `spawn`'s `wake`). Together these let an agent fork itself as a child and receive replies via the same manifest-anchored wake mechanism `spawn` uses. + +Chat UI: `readInboxText` falls back to `message` and `content` keys when `text` isn't present, so messages sent by agents (which sometimes emit those shapes) render as a chat bubble body instead of a blank bar. diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index 98412984f9..ae76ee2fc1 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -102,10 +102,7 @@ export interface HandlerContextConfig { observe?: boolean } ) => Promise - doFork: ( - targetEntityUrl: string, - opts?: { observe?: boolean } - ) => Promise<{ url: string }> + doFork: (targetEntityUrl: string) => Promise<{ url: string }> doMkdb: ( id: string, schema: TSchema @@ -762,11 +759,8 @@ export function createHandlerContext( ): Promise { return config.doSpawn(type, id, args, opts) }, - fork( - targetEntityUrl?: string, - opts?: { observe?: boolean } - ): Promise<{ url: string }> { - return config.doFork(targetEntityUrl ?? config.entityUrl, opts) + fork(targetEntityUrl?: string): Promise<{ url: string }> { + return config.doFork(targetEntityUrl ?? config.entityUrl) }, mkdb( id: string, diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index a180f83b23..ddc470b085 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -7,6 +7,7 @@ import { normalizeObservationSchema } from './observation-schema' import { createWakeSession } from './wake-session' import { createHandlerContext } from './context-factory' import { createSetupContext } from './setup-context' +import type { WiringConfig } from './setup-context' import { createEntityLogPrefix, runtimeLog } from './log' import { createRuntimeServerClient } from './runtime-server-client' import { unrestrictedSandbox } from './sandbox/unrestricted' @@ -17,7 +18,7 @@ import { buildHydratedEventSourceWake, eventSourceWakeInfoFromManifests, } from './event-sources' -import { entity, webhookObservationCollections } from './observation-sources' +import { webhookObservationCollections } from './observation-sources' import type { HydratedEventSourceWake } from './event-sources' import { SandboxError } from './sandbox/types' import type { Sandbox } from './sandbox/types' @@ -1329,9 +1330,14 @@ export async function processWake( }, forkEntity: async ( - sourceEntityUrl: string + sourceEntityUrl: string, + opts?: Parameters[1] ): Promise<{ entityUrl: string; streamPath: string }> => { - const result = await serverClient.forkEntity({ sourceEntityUrl }) + const result = await serverClient.forkEntity({ + sourceEntityUrl, + ...(opts?.parent !== undefined && { parent: opts.parent }), + ...(opts?.wake !== undefined && { wake: opts.wake }), + }) return { entityUrl: result.entityUrl, streamPath: result.streamPath } }, @@ -1757,21 +1763,50 @@ export async function processWake( } const doFork = async ( - targetEntityUrl: string, - opts?: { observe?: boolean } + targetEntityUrl: string ): Promise<{ url: string }> => { - const { entityUrl: forkUrl } = - await wiringConfig.forkEntity(targetEntityUrl) - // Auto-observe by default: register a runFinished wake on the new - // fork so the caller wakes when the fork's next run completes. - // Mirrors `spawn_worker`'s default wake. Opt out with - // `observe: false` for fire-and-forget forks. - if (opts?.observe !== false) { - await doObserve(entity(forkUrl), { - on: `runFinished`, - includeResponse: true, - }) - } + // Child-fork: the new fork is created with `parent = me`, and a + // `runFinished + includeResponse` wake subscription is registered + // on it server-side, mirroring the spawn flow. Reply delivery + // back to the parent uses the same manifest-anchored wake the + // spawn flow uses — the parent wakes when the fork's next run + // finishes, and the wake message carries the fork's response. + // + // We don't know the new fork's URL until the server assigns it, + // so the manifest entry is keyed on the assigned URL after the + // fork creates. (Contrast: spawn knows the id up front and pre- + // registers a placeholder.) + const tentativeWake = { + subscriberUrl: entityUrl, + condition: `runFinished` as const, + includeResponse: true, + } + const { entityUrl: forkUrl } = await wiringConfig.forkEntity( + targetEntityUrl, + { + parent: entityUrl, + wake: tentativeWake, + } + ) + const segments = forkUrl.split(`/`).filter(Boolean) + const forkType = segments[0] ?? `unknown` + const forkId = segments[1] ?? forkUrl + const childKey = manifestChildKey(forkType, forkId) + // Patch the registered wake's manifestKey now that we know the + // assigned URL — the server-side registration doesn't include it + // because we didn't have it pre-fork. Server-side cleanup paths + // already match wakes by subscriberUrl + sourceUrl, so leaving + // manifestKey unset is safe; we still mirror the value into the + // manifest entry for parity with spawn's bookkeeping. + wakeSession.registerManifestEntry({ + kind: `child`, + key: childKey, + id: forkId, + entity_type: forkType, + entity_url: forkUrl, + observed: true, + wake: { on: `runFinished`, includeResponse: true }, + }) return { url: forkUrl } } diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index 0a5cd86b5c..1e33195e71 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -125,9 +125,24 @@ export interface RuntimeServerClient { * Fork an entity at the server-resolved `latest_completed_run` anchor. * Resolves to the new root entity's info. Wraps the agents-server * `POST /_electric/entities///fork` endpoint. + * + * Optional `parent` makes the new fork a child of that URL; pair with + * `wake` to register a subscription on the new fork at fork time + * (mirrors the spawn flow's `parent` + `wake` plumbing). Reply + * delivery to the parent uses the same manifest-anchored wake the + * spawn flow uses. */ forkEntity: (options: { sourceEntityUrl: string + parent?: string + wake?: { + subscriberUrl: string + condition: RegisterWakeOptions[`condition`] + debounceMs?: number + timeoutMs?: number + includeResponse?: boolean + manifestKey?: string + } }) => Promise getEntity: (entityUrl: string) => Promise ensureSharedStateStream: ( @@ -459,13 +474,29 @@ export function createRuntimeServerClient( const forkEntity = async ({ sourceEntityUrl, + parent, + wake, }: { sourceEntityUrl: string + parent?: string + wake?: { + subscriberUrl: string + condition: RegisterWakeOptions[`condition`] + debounceMs?: number + timeoutMs?: number + includeResponse?: boolean + manifestKey?: string + } }): Promise => { + const body: Record = { + anchor: `latest_completed_run`, + } + if (parent !== undefined) body.parent = parent + if (wake !== undefined) body.wake = wake const response = await request(entityRpcPath(sourceEntityUrl, `/fork`), { method: `POST`, headers: { 'content-type': `application/json` }, - body: JSON.stringify({ anchor: `latest_completed_run` }), + body: JSON.stringify(body), }) if (!response.ok) { throw new Error( diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index bf95edcf29..93ee3268b7 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -45,6 +45,7 @@ import type { Wake, WakeSession, } from './types' +import type { TagOperation } from './tags' interface EffectScope { register: ( @@ -71,9 +72,33 @@ export interface WiringConfig { /** * Fork a top-level entity at the server-resolved latest completed * run. Returns the new root entity's URL + main stream path. + * + * Optional `parent` makes the new fork a child of that URL; pair with + * `wake` to register a subscription at fork time. The `condition` + * here is the agents-server's normalized wake shape (`'runFinished'` + * or `{ on: 'change', ... }`) — callers above this layer (`doFork`) + * translate the user-facing `Wake` into this form, the same way + * createOrGetChild does for spawn. */ forkEntity: ( - sourceEntityUrl: string + sourceEntityUrl: string, + opts?: { + parent?: string + wake?: { + subscriberUrl: string + condition: + | `runFinished` + | { + on: `change` + collections?: Array + ops?: Array + } + debounceMs?: number + timeoutMs?: number + includeResponse?: boolean + manifestKey?: string + } + } ) => Promise<{ entityUrl: string; streamPath: string }> /** Create a child StreamDB, preload it, and register it for cleanup. */ createChildDb: ( diff --git a/packages/agents-runtime/src/tools/send.ts b/packages/agents-runtime/src/tools/send.ts index 2a58b5961e..09d7cdedd5 100644 --- a/packages/agents-runtime/src/tools/send.ts +++ b/packages/agents-runtime/src/tools/send.ts @@ -45,7 +45,7 @@ export function createSendTool( }) ), payload: Type.Any({ - description: `Message payload to deliver to the target entity.`, + description: `Message payload to deliver to the target entity. For chat-rendered targets (Horton sessions, your own forks), use the shape \`{ text: "..." }\` so the message body shows up in the chat UI — that's the same shape the user's chat input produces. A plain string also works. Other shapes are delivered as-is but won't render as a chat bubble body.`, }), type: Type.Optional( Type.String({ description: `Optional message type.` }) diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 40960bb1bb..6a20b8a638 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -990,17 +990,14 @@ export interface HandlerContext< /** * Fork a session at the latest completed run on its `main` stream. * Defaults to this entity (self-fork) when `targetEntityUrl` is - * omitted. The fork is a sibling — not a child — of the source. - * - * `observe: true` (the default) registers an observation on the new - * fork wired to wake this entity on the fork's next `runFinished`. - * Pass `observe: false` for fire-and-forget forks where the caller - * never plans to react to the fork's completion. + * omitted. The new fork is created as a CHILD of this entity (same + * parent-ownership model as `spawn`), and a `runFinished + + * includeResponse` wake is registered on it at fork time. Reply + * delivery uses the parent's manifest-anchored wake — the same + * mechanism `spawn` uses — so when the fork's next run finishes, + * this entity wakes with the response in the wake message. */ - fork: ( - targetEntityUrl?: string, - opts?: { observe?: boolean } - ) => Promise<{ url: string }> + fork: (targetEntityUrl?: string) => Promise<{ url: string }> observe: (( source: ObservationSource & { sourceType: `entity` }, opts?: { wake?: Wake } diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index 7593b6e44a..968991f6df 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -75,11 +75,20 @@ function renderRowKey(row: RenderTimelineRow): string { } function readInboxText(payload: unknown): string { + if (typeof payload === `string`) return payload if (payload && typeof payload === `object`) { - const text = (payload as { text?: unknown }).text - if (typeof text === `string`) return text + // Prefer the canonical `text` key (what the chat input emits and what + // the `send` tool's description recommends), then fall back to + // `message` / `content` since agents sometimes emit those when the + // shape guidance isn't internalised. Keeps casually-shaped agent-to- + // agent sends visible in the chat instead of rendering as a blank. + const candidates = [`text`, `message`, `content`] as const + for (const key of candidates) { + const value = (payload as Record)[key] + if (typeof value === `string`) return value + } } - return typeof payload === `string` ? payload : `` + return `` } function stringifySearchPayload(value: unknown): string { diff --git a/packages/agents-server-ui/src/lib/sendMessage.ts b/packages/agents-server-ui/src/lib/sendMessage.ts index eee1cec177..f84de05530 100644 --- a/packages/agents-server-ui/src/lib/sendMessage.ts +++ b/packages/agents-server-ui/src/lib/sendMessage.ts @@ -292,11 +292,15 @@ export async function sendEntityMessage({ } export function readTextPayload(payload: unknown): string { + if (typeof payload === `string`) return payload if (payload && typeof payload === `object`) { - const text = (payload as { text?: unknown }).text - if (typeof text === `string`) return text + const candidates = [`text`, `message`, `content`] as const + for (const key of candidates) { + const value = (payload as Record)[key] + if (typeof value === `string`) return value + } } - return typeof payload === `string` ? payload : `` + return `` } function principalUrl(principalKey: string): string { diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index d46d0cf734..56b99a3490 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -185,6 +185,21 @@ type ForkSubtreeOptions = { * is the eligibility rule the per-row "Fork from here" UI uses. */ anchor?: `latest_completed_run` + /** + * Optional parent URL for the new root fork entity. When set, the + * new fork is a CHILD of this URL — its `parent` field is overridden + * (rather than inherited from the source, which is `null` for the + * only allowed source: a top-level entity). Pairs with `wake` to give + * the parent reply-delivery the same way spawn does. + */ + parent?: string + /** + * Optional wake subscription registered on the new root fork at fork + * time. Same shape and semantics as `spawn`'s `wake` (see + * `TypedSpawnRequest.wake`). Typically set by the agent `fork` tool + * to wire reply delivery via the parent's manifest-anchored wake. + */ + wake?: TypedSpawnRequest[`wake`] } type ForkEntityPlan = { @@ -1096,6 +1111,17 @@ export class EntityManager { stringMap ) + // Override the new root fork's parent when the caller asked for a + // child fork. Plumbed through from `forkBodySchema.parent`; the + // descendants in `effectiveSubtree` keep their original parent + // links (remapped via `entityUrlMap` in buildForkEntityPlans). + if (opts.parent !== undefined) { + const rootPlan = entityPlans.find((plan) => plan.source.url === rootUrl) + if (rootPlan) { + rootPlan.fork.parent = opts.parent + } + } + this.addForkLocks( this.forkWriteLockedEntities, effectiveSubtree.map((entity) => entity.url), @@ -1183,6 +1209,30 @@ export class EntityManager { createdEntities.push(plan.fork.url) } + // Register a wake subscription on the new root fork when the + // caller asked for one. Mirrors the spawn flow's wake handling + // (entity-manager.spawnInner). Rollback below already calls + // `wakeRegistry.unregisterBySource(forkUrl)` for every created + // entity, so this cleans up automatically on failure. + if (opts.wake !== undefined) { + const rootPlan = entityPlans.find( + (plan) => plan.source.url === rootUrl + ) + if (rootPlan) { + await this.wakeRegistry.register({ + tenantId: this.tenantId, + subscriberUrl: opts.wake.subscriberUrl, + sourceUrl: rootPlan.fork.url, + condition: opts.wake.condition, + debounceMs: opts.wake.debounceMs, + timeoutMs: opts.wake.timeoutMs, + oneShot: false, + includeResponse: opts.wake.includeResponse, + manifestKey: opts.wake.manifestKey, + }) + } + } + for (const plan of entityPlans) { const manifests = activeManifestsByEntity.get(plan.fork.url) ?? new Map() diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index 0f190e4916..81bd1685ad 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -200,6 +200,27 @@ const forkBodySchema = Type.Object({ // to the source's per-row pointer side-table. Mutually exclusive with // `fork_pointer`. anchor: Type.Optional(Type.Literal(`latest_completed_run`)), + // Optional parent URL. When set, the new root fork is a CHILD of this + // URL (rather than inheriting the source's parent, which is `null` + // for the only allowed source — a top-level entity). Used by the + // agent `fork` tool to make a forking entity own its forks the same + // way a spawning entity owns its workers. + parent: Type.Optional(Type.String()), + // Optional wake subscription registered on the new root fork at fork + // time. Mirrors the `wake` field on spawn — the subscriber URL gets + // woken when the fork meets the named condition, with the response + // optionally inlined. Used to wire fork-as-child reply delivery + // through the same manifest-anchored mechanism spawn uses. + wake: Type.Optional( + Type.Object({ + subscriberUrl: Type.String(), + condition: wakeConditionSchema, + debounceMs: Type.Optional(Type.Number()), + timeoutMs: Type.Optional(Type.Number()), + includeResponse: Type.Optional(Type.Boolean()), + manifestKey: Type.Optional(Type.String()), + }) + ), }) const setTagBodySchema = Type.Object({ @@ -1088,6 +1109,8 @@ async function forkEntity( }, }), ...(parsed.anchor && { anchor: parsed.anchor }), + ...(parsed.parent !== undefined && { parent: parsed.parent }), + ...(parsed.wake !== undefined && { wake: parsed.wake }), }) for (const forkedEntity of result.entities) { await linkEntityDispatchSubscription(ctx, forkedEntity) diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index f8bc98749d..eae36ba572 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -240,7 +240,7 @@ When a user opens with a greeting ("hi", "hello", "hey", etc.) or a broad statem - web_search: search the web - fetch_url: fetch and convert a URL to markdown - spawn_worker: dispatch a subagent for an isolated task -- fork: create a sibling session that inherits this conversation's history up to the latest completed response. Auto-observes — you'll wake when the fork's next run finishes. +- fork: spawn a child session that inherits this conversation's history up to the latest completed response. Same parent-ownership model as spawn_worker — when the fork's next run finishes, you'll wake with its response. - send: send a message to an Electric Agent/entity. To schedule future work for yourself, call send with self: true and afterMs. ${eventSourceTools}${docsTools}${skillsTools} @@ -270,18 +270,19 @@ When you spawn a worker, write its system prompt the way you'd brief a colleague After spawning, end your turn (optionally with a brief "I've dispatched a worker for X; I'll respond when it finishes"). When the worker finishes, you'll receive a message describing which worker completed and what it returned. Multiple workers may finish at different times — check the message for the worker URL to know which one you're hearing about. # When to fork (vs spawn_worker) -The two tools solve different problems: -- **spawn_worker** creates a NEW subagent with an EMPTY context. You brief it with a system prompt and an initial message from scratch. Use it to delegate an isolated subtask where the worker doesn't need our conversation history. -- **fork** creates a sibling SESSION that inherits THIS conversation's full history up to your latest completed response. Use it to explore multiple alternative continuations of the same conversation in parallel — when each branch needs to "know what we already said," not just a brief. +\`fork\` is the **sibling primitive to spawn_worker** — both create a child you own, both report back to you on a future wake. The difference is only in what the child boots with: -Typical fork use cases: trying two or three different answers to the same question and comparing, A/B-testing an approach at a decision point, or any "what if I'd responded differently here" exploration. +- **spawn_worker** → child boots with an **empty context**; you brief it from scratch via a system prompt + initial message. Use when the worker doesn't need to know what we've said so far. +- **fork** → child boots with **a copy of THIS conversation's history** up to your latest completed response. Use when each child needs to know what we've already established (the user's framing, your prior analysis, an earlier decision, the constraints, etc.). + +**Trigger pattern: prefer fork when generating multiple variants the user wants to compare.** If the user asks for "three different X" or "two takes on Y" or "evaluate these N approaches" and each variant should reflect the conversation we've had so far, **don't inline the variants in one response** — fork once per variant, send each a tailored follow-up, and synthesize when they report back. Inlining feels faster but the variants end up cross-contaminating in your single response; forks keep them honestly independent. The exception is trivial generation (a list of names, a couple of one-liners) where each variant takes a sentence — there, inline is fine. Workflow when forking yourself for parallel exploration: 1. **End your current turn first.** The fork's history stops at your *latest completed* run. Anything you say mid-turn is NOT in the fork. If you want your analysis baked into each fork, finish it and end the turn before calling fork. -2. On the next wake, call \`fork\` once per branch you want to explore (or pass an \`entityUrl\` to fork a different session). +2. On the next wake, call \`fork\` once per branch. Each fork is YOUR child, just like a spawned worker. 3. For each fork URL the tool returned, use \`send\` to deliver a *different* follow-up prompt — that's how the branches diverge from a shared starting point. -4. End your turn. You'll wake automatically when each fork's run finishes; the wake message includes the fork's response. -5. If you're waiting on multiple forks, don't synthesize on the first wake — quietly end the turn with something like "got N of M, waiting" until you have what you need to compare. +4. End your turn. You'll wake automatically when each fork's run finishes (same wake mechanism as spawn_worker); the wake message identifies the fork and includes its response. +5. If you're waiting on multiple forks, don't synthesize on the first wake — quietly end the turn with "got N of M, waiting" until you have what you need to compare. # Reporting Report outcomes faithfully. If a command failed, say so with the relevant output. If you didn't run a verification step, say that rather than implying you did. Don't hedge confirmed results with unnecessary disclaimers. diff --git a/packages/agents/src/tools/fork.ts b/packages/agents/src/tools/fork.ts index 6925b89617..1e414bd26e 100644 --- a/packages/agents/src/tools/fork.ts +++ b/packages/agents/src/tools/fork.ts @@ -7,11 +7,13 @@ export function createForkTool(ctx: HandlerContext): AgentTool { return { name: `fork`, label: `Fork`, - description: `Fork a session at its latest completed agent response, producing a sibling copy of the conversation up to that point. The new session boots idle — use the existing 'send' tool to dispatch a follow-up prompt to it. The fork is auto-observed: when its next run finishes you will be woken with the response, so end your turn after forking. + description: `Fork a session at its latest completed agent response, producing a child copy of the conversation up to that point. The new fork is YOUR child — same parent-ownership model as a spawned worker — and it reports back to you the same way: when its next run finishes you'll be woken with its response. End your turn after forking. + +The fork boots idle. Use the existing 'send' tool to dispatch a follow-up prompt to it; the fork will run, finish, and you'll wake with the response just like with spawn_worker. Use this to explore multiple alternative continuations in parallel from the same starting point. End your current turn first so the fork includes your latest response — the anchor is always the most recently completed run. -Omit 'entityUrl' to fork your own session. Pass a different session's URL to fork that session instead.`, +Omit 'entityUrl' to fork your own session. Pass a different session's URL to fork that session instead (the new fork is still your child).`, parameters: Type.Object({ entityUrl: Type.Optional( Type.String({ @@ -27,7 +29,7 @@ Omit 'entityUrl' to fork your own session. Pass a different session's URL to for content: [ { type: `text` as const, - text: `Forked at ${url}. The fork boots idle — use the 'send' tool to dispatch a follow-up prompt. End your turn afterwards; you'll wake when the fork's next run finishes.`, + text: `Forked at ${url}. The fork is your child and boots idle — use the 'send' tool to dispatch a follow-up prompt. End your turn afterwards; you'll wake with the fork's response when its next run finishes (same as a spawned worker).`, }, ], details: { forked: true, forkUrl: url }, From bffdf19d93859ad70a0a26a39169f524a28f50d5 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 11:13:48 +0200 Subject: [PATCH 03/10] refactor(fork): take an opts bag instead of a positional targetEntityUrl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `ctx.fork` was `(targetEntityUrl?: string) => …` — a single optional positional arg. Future knobs (the deferred explicit-anchor parameter) would force callers to thread `undefined` through the positional spot to set a later arg. Switch to an options bag now so the upgrade path is additive. `ctx.fork()` self-fork stays a no-arg call. Cross-entity fork becomes `ctx.fork({ targetEntityUrl: '...' })`. Threaded through `HandlerContextConfig.doFork` and the agents `fork` tool wrapper. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/agents-runtime/src/context-factory.ts | 8 +++++--- packages/agents-runtime/src/process-wake.ts | 5 +++-- packages/agents-runtime/src/types.ts | 13 +++++++++---- packages/agents/src/tools/fork.ts | 4 +++- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index ae76ee2fc1..f0a1c0f4cd 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -102,7 +102,7 @@ export interface HandlerContextConfig { observe?: boolean } ) => Promise - doFork: (targetEntityUrl: string) => Promise<{ url: string }> + doFork: (opts: { targetEntityUrl: string }) => Promise<{ url: string }> doMkdb: ( id: string, schema: TSchema @@ -759,8 +759,10 @@ export function createHandlerContext( ): Promise { return config.doSpawn(type, id, args, opts) }, - fork(targetEntityUrl?: string): Promise<{ url: string }> { - return config.doFork(targetEntityUrl ?? config.entityUrl) + fork(opts?: { targetEntityUrl?: string }): Promise<{ url: string }> { + return config.doFork({ + targetEntityUrl: opts?.targetEntityUrl ?? config.entityUrl, + }) }, mkdb( id: string, diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index ddc470b085..742bde97a2 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -1762,9 +1762,10 @@ export async function processWake( return setupCtx.spawn(type, id, spawnArgs, opts) } - const doFork = async ( + const doFork = async (opts: { targetEntityUrl: string - ): Promise<{ url: string }> => { + }): Promise<{ url: string }> => { + const { targetEntityUrl } = opts // Child-fork: the new fork is created with `parent = me`, and a // `runFinished + includeResponse` wake subscription is registered // on it server-side, mirroring the spawn flow. Reply delivery diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 6a20b8a638..bb3e67e2e9 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -989,15 +989,20 @@ export interface HandlerContext< ) => Promise /** * Fork a session at the latest completed run on its `main` stream. - * Defaults to this entity (self-fork) when `targetEntityUrl` is - * omitted. The new fork is created as a CHILD of this entity (same - * parent-ownership model as `spawn`), and a `runFinished + + * Defaults to this entity (self-fork) when `opts.targetEntityUrl` + * is omitted. The new fork is created as a CHILD of this entity + * (same parent-ownership model as `spawn`), and a `runFinished + * includeResponse` wake is registered on it at fork time. Reply * delivery uses the parent's manifest-anchored wake — the same * mechanism `spawn` uses — so when the fork's next run finishes, * this entity wakes with the response in the wake message. + * + * `opts` is an options bag (rather than a positional `targetEntityUrl` + * arg) so future knobs — e.g. an explicit-anchor parameter — can be + * added without making callers thread `undefined` through earlier + * positions. */ - fork: (targetEntityUrl?: string) => Promise<{ url: string }> + fork: (opts?: { targetEntityUrl?: string }) => Promise<{ url: string }> observe: (( source: ObservationSource & { sourceType: `entity` }, opts?: { wake?: Wake } diff --git a/packages/agents/src/tools/fork.ts b/packages/agents/src/tools/fork.ts index 1e414bd26e..11460f257c 100644 --- a/packages/agents/src/tools/fork.ts +++ b/packages/agents/src/tools/fork.ts @@ -24,7 +24,9 @@ Omit 'entityUrl' to fork your own session. Pass a different session's URL to for execute: async (_toolCallId, params) => { const { entityUrl } = params as { entityUrl?: string } try { - const { url } = await ctx.fork(entityUrl) + const { url } = await ctx.fork( + entityUrl !== undefined ? { targetEntityUrl: entityUrl } : undefined + ) return { content: [ { From e0c97cad234eec2a951ebbc5990e0a3b8bbb963e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 11:54:27 +0200 Subject: [PATCH 04/10] =?UTF-8?q?feat(fork):=20spawn-parity=20options=20on?= =?UTF-8?q?=20ctx.fork=20=E2=80=94=20initialMessage,=20wake,=20tags,=20obs?= =?UTF-8?q?erve?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bring `ctx.fork`'s opts shape into line with `ctx.spawn`'s. Sam's review flagged the asymmetry: spawn exposes a settled set of knobs for parent-owned child creation; fork — which is the same lifecycle pattern with a different starting context — was missing the corresponding ones. `ctx.fork(opts?)` now mirrors spawn where the semantics map: - `initialMessage?: unknown` — atomic dispatch. The fork's inbox row is delivered server-side via `entityManager.send` AFTER `linkEntityDispatchSubscription` runs (same ordering spawn uses), so the dispatcher is subscribed before the inbox lands and the fork actually wakes on it. Folds the previous fork+send two-step into one tool call and closes the "send fails, parent has an idle fork on its manifest" reliability gap. - `wake?: Wake` — overrides the default `runFinished + includeResponse`. Translated through the same `normalizeWake` logic createOrGetChild uses for spawn. - `tags?: Record` — stamped on the new root fork in addition to those copied from the source. - `observe?: boolean` (default `true`) — `false` opts out of the parent relationship entirely: no parent URL, no wake registration, no manifest child row. Fire-and-forget mirror of spawn's semantics. Server side: `forkBodySchema` grows the corresponding fields; the route handler delivers `initialMessage` via `entityManager.send(rootUrl, …)` after the link-dispatch loop, matching spawn's pattern exactly. `sandbox` is deferred to a follow-up — applying it requires the `resolveSandboxForSpawn` resolver chain (inherit handling, runner allowance), which is a non-trivial surface and not load-bearing for the current demo flow. Tool side: exposes `initialMessage` and `tags` to the model; keeps `wake`/`observe` runtime-only. Updates the description and Horton's system prompt to lead with `initialMessage` so the parallel-exploration loop becomes "fork-with-prompt N times" instead of "fork then send N times." Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/fork-tool-horton.md | 4 +- .changeset/fork-tool-runtime.md | 16 ++- .changeset/fork-tool-server.md | 9 +- .../agents-runtime/src/context-factory.ts | 22 ++- packages/agents-runtime/src/process-wake.ts | 126 +++++++++++++----- .../src/runtime-server-client.ts | 22 ++- packages/agents-runtime/src/setup-context.ts | 2 + packages/agents-runtime/src/types.ts | 20 ++- packages/agents-server/src/entity-manager.ts | 32 +++-- .../src/routing/entities-router.ts | 20 +++ packages/agents/src/agents/horton.ts | 7 +- packages/agents/src/tools/fork.ts | 32 ++++- 12 files changed, 239 insertions(+), 73 deletions(-) diff --git a/.changeset/fork-tool-horton.md b/.changeset/fork-tool-horton.md index 7bf86a77c8..4e04df8113 100644 --- a/.changeset/fork-tool-horton.md +++ b/.changeset/fork-tool-horton.md @@ -2,6 +2,6 @@ '@electric-ax/agents': patch --- -Give Horton a `fork` tool that creates a child session inheriting this conversation's history up to the latest completed response. Takes an optional `entityUrl` (omit for self-fork) and delegates to `ctx.fork`, which makes the fork a CHILD of the calling entity (same parent-ownership model as `spawn_worker`) and wires reply delivery through the same manifest-anchored wake — when the fork's next run finishes, the parent wakes with the response in the wake message. +Give Horton a `fork` tool that creates a child session inheriting this conversation's history up to the latest completed response. Takes an optional `entityUrl` (omit for self-fork), an optional `initialMessage` (delivered atomically — no follow-up `send` needed), and optional `tags`. The fork is created as a CHILD of the calling entity (same parent-ownership model as `spawn_worker`) and wires reply delivery through the same manifest-anchored wake — when the fork's next run finishes, the parent wakes with the response in the wake message. -Horton's system prompt grows a "When to fork (vs spawn_worker)" section framing the two tools as a pair: both create a child the parent owns and gets replies from, the difference is what the child boots with — `spawn_worker` starts with an empty context (you brief it from scratch), `fork` starts with a copy of the conversation up to the latest completed response. Includes an explicit trigger pattern ("prefer fork when generating multiple variants the user wants to compare; don't inline") to route "give me three takes" / "evaluate these N approaches" prompts to fork rather than collapsing them into one inline response, plus the end-turn-first / send-different-prompts / wait-for-all-responses workflow for the parallel-exploration loop. +Horton's system prompt grows a "When to fork (vs spawn_worker)" section framing the two tools as a pair: both create a child the parent owns and gets replies from, the difference is what the child boots with — `spawn_worker` starts with an empty context (you brief it from scratch), `fork` starts with a copy of the conversation up to the latest completed response. Includes an explicit trigger pattern ("prefer fork when generating multiple variants the user wants to compare; don't inline") to route "give me three takes" / "evaluate these N approaches" prompts to fork rather than collapsing them into one inline response, plus the workflow for the parallel-exploration loop (end-turn-first, fork-once-per-branch with a different `initialMessage` each, wait for all responses before synthesising). diff --git a/.changeset/fork-tool-runtime.md b/.changeset/fork-tool-runtime.md index 98a290658d..6b297b2aeb 100644 --- a/.changeset/fork-tool-runtime.md +++ b/.changeset/fork-tool-runtime.md @@ -2,8 +2,20 @@ '@electric-ax/agents-runtime': patch --- -Add `ctx.fork(targetEntityUrl?)` to `HandlerContext`. Calls the agents-server fork endpoint with `anchor: 'latest_completed_run'`, `parent: ctx.entityUrl`, and a `runFinished + includeResponse` wake — so the new fork is a CHILD of this entity (same parent-ownership model as `ctx.spawn`) and reports back via the same manifest-anchored wake mechanism `spawn` uses. Defaults `targetEntityUrl` to `ctx.entityUrl` for self-fork. +Add `ctx.fork(opts?)` to `HandlerContext`, with an opts shape that mirrors `ctx.spawn`'s where the semantics map: -Internally writes a `kind: 'child'` manifest row on the parent's `main` stream alongside the server-side wake registration, mirroring the spawn flow's bookkeeping so the relationship persists across wakes. Wired through new `parent` + `wake` fields on `RuntimeServerClient.forkEntity` and `WiringConfig.forkEntity`. +```ts +ctx.fork(opts?: { + targetEntityUrl?: string // omit for self-fork + initialMessage?: unknown // delivered atomically with creation + wake?: Wake // overrides the default runFinished + includeResponse + tags?: Record + observe?: boolean // `false` = fire-and-forget (no parent, no wake, no manifest entry) +}) +``` + +By default (`observe: true`), the new fork is a CHILD of this entity (same parent-ownership model as `ctx.spawn`), and a `runFinished + includeResponse` wake is registered on it server-side. Reply delivery uses the same manifest-anchored wake mechanism `ctx.spawn` uses — when the fork's next run finishes, this entity wakes with the response. `observe: false` opts out of the parent relationship entirely: no parent URL, no wake subscription, no manifest entry on the parent's stream. + +Internally writes a `kind: 'child'` manifest row on the parent's stream alongside the server-side wake registration, mirroring the spawn flow's bookkeeping so the relationship persists across wakes. Wired through new fields on `RuntimeServerClient.forkEntity` (`parent`, `wake`, `initialMessage`, `tags`) and `WiringConfig.forkEntity`. A `normalizeWake` helper translates the user-facing `Wake` type into the wakeRegistry-compatible shape, same logic `createOrGetChild` uses for spawn. The `send` tool's `payload` description now documents the canonical `{ text: "..." }` shape for chat-rendered targets (Horton sessions, agent forks) so messages emitted by `send` render as chat bubbles instead of blank bars. diff --git a/.changeset/fork-tool-server.md b/.changeset/fork-tool-server.md index 4fa08fd0ad..efccc8d150 100644 --- a/.changeset/fork-tool-server.md +++ b/.changeset/fork-tool-server.md @@ -3,9 +3,14 @@ '@electric-ax/agents-server-ui': patch --- -Add server-resolved fork anchor + parent/wake fields to `POST /_electric/entities///fork`. +Add server-resolved fork anchor + spawn-parity body fields to `POST /_electric/entities///fork`. - `anchor: 'latest_completed_run'` is an alternative to `fork_pointer`: the server scans the source root's `main` history, finds the most recent `runs` row with `status === 'completed'`, derives the matching `{ offset, sub_offset }` pointer, and runs the existing pointer-fork path with it. Mutually exclusive with `fork_pointer` (400 if both); 400 if no completed run exists. Lets callers without access to the source's per-row pointer side-table (e.g. an agent forking via a tool) fork at the same anchor the per-row "Fork from here" UI uses. -- `parent` and `wake` mirror the corresponding `spawn` body fields. When `parent` is set, the new root fork is a CHILD of that URL (rather than inheriting the source's parent). `wake` registers a subscription on the new root fork at fork time (same shape as `spawn`'s `wake`). Together these let an agent fork itself as a child and receive replies via the same manifest-anchored wake mechanism `spawn` uses. +- `parent` overrides the new root fork's `parent` field, making it a CHILD of that URL (rather than inheriting the source's parent). +- `wake` registers a subscription on the new root fork at fork time (same shape as `spawn`'s `wake`). +- `initialMessage` is delivered to the new root fork via `entityManager.send` after `linkEntityDispatchSubscription` runs — same ordering spawn uses, so the dispatcher is subscribed before the inbox row lands and the fork actually wakes on the message instead of sitting idle. +- `tags` are stamped on the new root fork in addition to those copied from the source. + +Together these let an agent fork itself as a child and receive replies via the same manifest-anchored wake mechanism `spawn` uses, with a single round-trip fork-and-dispatch. Chat UI: `readInboxText` falls back to `message` and `content` keys when `text` isn't present, so messages sent by agents (which sometimes emit those shapes) render as a chat bubble body instead of a blank bar. diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index f0a1c0f4cd..bbbc2d459a 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -102,7 +102,13 @@ export interface HandlerContextConfig { observe?: boolean } ) => Promise - doFork: (opts: { targetEntityUrl: string }) => Promise<{ url: string }> + doFork: (opts: { + targetEntityUrl: string + initialMessage?: unknown + wake?: Wake + tags?: Record + observe?: boolean + }) => Promise<{ url: string }> doMkdb: ( id: string, schema: TSchema @@ -759,9 +765,21 @@ export function createHandlerContext( ): Promise { return config.doSpawn(type, id, args, opts) }, - fork(opts?: { targetEntityUrl?: string }): Promise<{ url: string }> { + fork(opts?: { + targetEntityUrl?: string + initialMessage?: unknown + wake?: Wake + tags?: Record + observe?: boolean + }): Promise<{ url: string }> { return config.doFork({ targetEntityUrl: opts?.targetEntityUrl ?? config.entityUrl, + ...(opts?.initialMessage !== undefined && { + initialMessage: opts.initialMessage, + }), + ...(opts?.wake !== undefined && { wake: opts.wake }), + ...(opts?.tags !== undefined && { tags: opts.tags }), + ...(opts?.observe !== undefined && { observe: opts.observe }), }) }, mkdb( diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index 742bde97a2..54e959a072 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -1337,6 +1337,10 @@ export async function processWake( sourceEntityUrl, ...(opts?.parent !== undefined && { parent: opts.parent }), ...(opts?.wake !== undefined && { wake: opts.wake }), + ...(opts?.initialMessage !== undefined && { + initialMessage: opts.initialMessage, + }), + ...(opts?.tags !== undefined && { tags: opts.tags }), }) return { entityUrl: result.entityUrl, streamPath: result.streamPath } }, @@ -1764,53 +1768,101 @@ export async function processWake( const doFork = async (opts: { targetEntityUrl: string + initialMessage?: unknown + wake?: Wake + tags?: Record + observe?: boolean }): Promise<{ url: string }> => { - const { targetEntityUrl } = opts - // Child-fork: the new fork is created with `parent = me`, and a - // `runFinished + includeResponse` wake subscription is registered - // on it server-side, mirroring the spawn flow. Reply delivery - // back to the parent uses the same manifest-anchored wake the - // spawn flow uses — the parent wakes when the fork's next run - // finishes, and the wake message carries the fork's response. + const { + targetEntityUrl, + initialMessage, + wake: callerWake, + tags, + observe = true, + } = opts + // Child-fork (default `observe: true`): the new fork is created + // with `parent = me`, and a `runFinished + includeResponse` wake + // is registered on it server-side, mirroring the spawn flow. + // Reply delivery back to the parent uses the same manifest- + // anchored wake the spawn flow uses. // - // We don't know the new fork's URL until the server assigns it, - // so the manifest entry is keyed on the assigned URL after the - // fork creates. (Contrast: spawn knows the id up front and pre- - // registers a placeholder.) - const tentativeWake = { - subscriberUrl: entityUrl, - condition: `runFinished` as const, - includeResponse: true, - } + // Fire-and-forget (`observe: false`): no parent link, no wake, + // no manifest entry. The fork exists as a top-level sibling and + // the caller never hears back about it. Mirrors `spawn`'s + // `observe: false` semantics. + const wakeForRegistration = !observe + ? undefined + : (normalizeWake(callerWake) ?? { + subscriberUrl: entityUrl, + condition: `runFinished` as const, + includeResponse: true, + }) const { entityUrl: forkUrl } = await wiringConfig.forkEntity( targetEntityUrl, { - parent: entityUrl, - wake: tentativeWake, + ...(observe && { parent: entityUrl }), + ...(wakeForRegistration && { wake: wakeForRegistration }), + ...(initialMessage !== undefined && { initialMessage }), + ...(tags && { tags }), } ) - const segments = forkUrl.split(`/`).filter(Boolean) - const forkType = segments[0] ?? `unknown` - const forkId = segments[1] ?? forkUrl - const childKey = manifestChildKey(forkType, forkId) - // Patch the registered wake's manifestKey now that we know the - // assigned URL — the server-side registration doesn't include it - // because we didn't have it pre-fork. Server-side cleanup paths - // already match wakes by subscriberUrl + sourceUrl, so leaving - // manifestKey unset is safe; we still mirror the value into the - // manifest entry for parity with spawn's bookkeeping. - wakeSession.registerManifestEntry({ - kind: `child`, - key: childKey, - id: forkId, - entity_type: forkType, - entity_url: forkUrl, - observed: true, - wake: { on: `runFinished`, includeResponse: true }, - }) + if (observe) { + const segments = forkUrl.split(`/`).filter(Boolean) + const forkType = segments[0] ?? `unknown` + const forkId = segments[1] ?? forkUrl + const childKey = manifestChildKey(forkType, forkId) + // Mirror the wake into the manifest entry for parity with + // spawn's bookkeeping — the server-side wake registry is the + // authoritative delivery mechanism; this keeps the parent's + // own state machine aware of the relationship across wakes. + const manifestWake = callerWake ?? { + on: `runFinished` as const, + includeResponse: true, + } + wakeSession.registerManifestEntry({ + kind: `child`, + key: childKey, + id: forkId, + entity_type: forkType, + entity_url: forkUrl, + observed: true, + wake: manifestWake, + }) + } return { url: forkUrl } } + // Translate a user-facing `Wake` (literal `runFinished` or + // structured) into the wakeRegistry-compatible shape used by + // `wiring.forkEntity`'s `wake` field. Same translation + // createOrGetChild does for spawn (process-wake.ts:1272-1295). + type ForkWake = NonNullable< + NonNullable[1]>[`wake`] + > + function normalizeWake(wake: Wake | undefined): ForkWake | undefined { + if (wake === undefined) return undefined + const isRunFinished = + wake === `runFinished` || + (typeof wake === `object` && wake.on === `runFinished`) + const condition: ForkWake[`condition`] = isRunFinished + ? `runFinished` + : (wake as Exclude) + const result: ForkWake = { + subscriberUrl: entityUrl, + condition, + } + if (typeof wake === `object` && wake.on === `runFinished`) { + if (wake.includeResponse !== undefined) { + result.includeResponse = wake.includeResponse + } + } + if (typeof wake === `object` && wake.on === `change`) { + if (wake.debounceMs !== undefined) result.debounceMs = wake.debounceMs + if (wake.timeoutMs !== undefined) result.timeoutMs = wake.timeoutMs + } + return result + } + const doMkdb = ( id: string, schema: TSchema diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index 1e33195e71..98590732b1 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -126,11 +126,15 @@ export interface RuntimeServerClient { * Resolves to the new root entity's info. Wraps the agents-server * `POST /_electric/entities///fork` endpoint. * - * Optional `parent` makes the new fork a child of that URL; pair with - * `wake` to register a subscription on the new fork at fork time - * (mirrors the spawn flow's `parent` + `wake` plumbing). Reply - * delivery to the parent uses the same manifest-anchored wake the - * spawn flow uses. + * Optional fields mirror `spawnEntity`: + * - `parent` makes the new fork a child of that URL. + * - `wake` registers a subscription at fork time (reply delivery + * uses the parent's manifest-anchored wake when paired with a + * manifest entry on the parent — same model as `spawn`). + * - `initialMessage` delivers an inbox message to the new fork + * atomically with creation, folding fork+send into one round-trip. + * - `tags` stamps tags onto the new fork in addition to those + * copied from the source. */ forkEntity: (options: { sourceEntityUrl: string @@ -143,6 +147,8 @@ export interface RuntimeServerClient { includeResponse?: boolean manifestKey?: string } + initialMessage?: unknown + tags?: Record }) => Promise getEntity: (entityUrl: string) => Promise ensureSharedStateStream: ( @@ -476,6 +482,8 @@ export function createRuntimeServerClient( sourceEntityUrl, parent, wake, + initialMessage, + tags, }: { sourceEntityUrl: string parent?: string @@ -487,12 +495,16 @@ export function createRuntimeServerClient( includeResponse?: boolean manifestKey?: string } + initialMessage?: unknown + tags?: Record }): Promise => { const body: Record = { anchor: `latest_completed_run`, } if (parent !== undefined) body.parent = parent if (wake !== undefined) body.wake = wake + if (initialMessage !== undefined) body.initialMessage = initialMessage + if (tags !== undefined) body.tags = tags const response = await request(entityRpcPath(sourceEntityUrl, `/fork`), { method: `POST`, headers: { 'content-type': `application/json` }, diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index 93ee3268b7..70a6b63833 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -98,6 +98,8 @@ export interface WiringConfig { includeResponse?: boolean manifestKey?: string } + initialMessage?: unknown + tags?: Record } ) => Promise<{ entityUrl: string; streamPath: string }> /** Create a child StreamDB, preload it, and register it for cleanup. */ diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index bb3e67e2e9..2f0f515327 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -997,12 +997,22 @@ export interface HandlerContext< * mechanism `spawn` uses — so when the fork's next run finishes, * this entity wakes with the response in the wake message. * - * `opts` is an options bag (rather than a positional `targetEntityUrl` - * arg) so future knobs — e.g. an explicit-anchor parameter — can be - * added without making callers thread `undefined` through earlier - * positions. + * Options mirror `spawn` where the semantics map: + * - `initialMessage` is delivered to the fork's inbox atomically + * with creation, folding the fork+send pattern into one call. + * - `wake` overrides the default `runFinished + includeResponse` + * subscription (e.g. to set debounce for high-fanout forking). + * - `tags` are stamped on top of the tags copied from the source. + * - `observe: false` opts out of the parent relationship entirely: + * no wake, no manifest entry, no reply path — fire-and-forget. */ - fork: (opts?: { targetEntityUrl?: string }) => Promise<{ url: string }> + fork: (opts?: { + targetEntityUrl?: string + initialMessage?: unknown + wake?: Wake + tags?: Record + observe?: boolean + }) => Promise<{ url: string }> observe: (( source: ObservationSource & { sourceType: `entity` }, opts?: { wake?: Wake } diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index 56b99a3490..ec4b2f1ae0 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -200,6 +200,11 @@ type ForkSubtreeOptions = { * to wire reply delivery via the parent's manifest-anchored wake. */ wake?: TypedSpawnRequest[`wake`] + /** + * Optional tags stamped onto the new root fork entity in addition + * to those copied from the source. Mirrors `spawn`'s `tags`. + */ + tags?: Record } type ForkEntityPlan = { @@ -1111,14 +1116,25 @@ export class EntityManager { stringMap ) - // Override the new root fork's parent when the caller asked for a - // child fork. Plumbed through from `forkBodySchema.parent`; the - // descendants in `effectiveSubtree` keep their original parent - // links (remapped via `entityUrlMap` in buildForkEntityPlans). - if (opts.parent !== undefined) { - const rootPlan = entityPlans.find((plan) => plan.source.url === rootUrl) - if (rootPlan) { - rootPlan.fork.parent = opts.parent + // Override fields on the new root fork's plan from caller opts. + // Plumbed through from `forkBodySchema` — descendants in + // `effectiveSubtree` keep what `buildForkEntityPlans` copied. + // + // - `parent`: child-fork relationship (vs the default inheritance + // from the source, which is `null` for a top-level source). + // - `tags`: merged on top of source tags (caller-supplied wins). + const rootPlanForOverrides = entityPlans.find( + (plan) => plan.source.url === rootUrl + ) + if (rootPlanForOverrides) { + if (opts.parent !== undefined) { + rootPlanForOverrides.fork.parent = opts.parent + } + if (opts.tags !== undefined) { + rootPlanForOverrides.fork.tags = { + ...(rootPlanForOverrides.fork.tags ?? {}), + ...opts.tags, + } } } diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index 81bd1685ad..8b91e5624e 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -221,6 +221,15 @@ const forkBodySchema = Type.Object({ manifestKey: Type.Optional(Type.String()), }) ), + // Optional initial inbox message delivered to the new root fork + // immediately after creation. Atomic with the fork RPC — folds the + // common "fork then send" pattern into a single round-trip and means + // a partial failure can't leave an idle fork on the parent's + // manifest. Mirrors spawn's `initialMessage` field. + initialMessage: Type.Optional(Type.Unknown()), + // Optional tags stamped on the new root fork entity in addition to + // those copied from the source. Mirrors spawn's `tags`. + tags: Type.Optional(stringRecordSchema), }) const setTagBodySchema = Type.Object({ @@ -1111,10 +1120,21 @@ async function forkEntity( ...(parsed.anchor && { anchor: parsed.anchor }), ...(parsed.parent !== undefined && { parent: parsed.parent }), ...(parsed.wake !== undefined && { wake: parsed.wake }), + ...(parsed.tags !== undefined && { tags: parsed.tags }), }) for (const forkedEntity of result.entities) { await linkEntityDispatchSubscription(ctx, forkedEntity) } + // Deliver the initial message via entityManager.send AFTER the + // dispatch subscription is linked — same ordering spawn uses. Sending + // before linking would land the inbox row on the stream before the + // dispatcher is subscribed, and the dispatcher would never pick it up. + if (parsed.initialMessage !== undefined) { + await ctx.entityManager.send(result.root.url, { + from: parsed.parent ?? ctx.principal.url, + payload: parsed.initialMessage, + }) + } return json( { root: toPublicEntity(result.root), diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index eae36ba572..2a4d8070cd 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -279,10 +279,9 @@ After spawning, end your turn (optionally with a brief "I've dispatched a worker Workflow when forking yourself for parallel exploration: 1. **End your current turn first.** The fork's history stops at your *latest completed* run. Anything you say mid-turn is NOT in the fork. If you want your analysis baked into each fork, finish it and end the turn before calling fork. -2. On the next wake, call \`fork\` once per branch. Each fork is YOUR child, just like a spawned worker. -3. For each fork URL the tool returned, use \`send\` to deliver a *different* follow-up prompt — that's how the branches diverge from a shared starting point. -4. End your turn. You'll wake automatically when each fork's run finishes (same wake mechanism as spawn_worker); the wake message identifies the fork and includes its response. -5. If you're waiting on multiple forks, don't synthesize on the first wake — quietly end the turn with "got N of M, waiting" until you have what you need to compare. +2. On the next wake, call \`fork\` once per branch with a different \`initialMessage\` per call — that's how the branches diverge from a shared starting point. Each fork is YOUR child, just like a spawned worker, and the \`initialMessage\` is delivered to it atomically with creation, so the fork starts running immediately (no follow-up \`send\` needed). Use the shape \`{ text: "..." }\` for the message so it renders in the chat UI. +3. End your turn. You'll wake automatically when each fork's run finishes (same wake mechanism as spawn_worker); the wake message identifies the fork and includes its response. +4. If you're waiting on multiple forks, don't synthesize on the first wake — quietly end the turn with "got N of M, waiting" until you have what you need to compare. # Reporting Report outcomes faithfully. If a command failed, say so with the relevant output. If you didn't run a verification step, say that rather than implying you did. Don't hedge confirmed results with unnecessary disclaimers. diff --git a/packages/agents/src/tools/fork.ts b/packages/agents/src/tools/fork.ts index 11460f257c..c8c9f42602 100644 --- a/packages/agents/src/tools/fork.ts +++ b/packages/agents/src/tools/fork.ts @@ -9,7 +9,7 @@ export function createForkTool(ctx: HandlerContext): AgentTool { label: `Fork`, description: `Fork a session at its latest completed agent response, producing a child copy of the conversation up to that point. The new fork is YOUR child — same parent-ownership model as a spawned worker — and it reports back to you the same way: when its next run finishes you'll be woken with its response. End your turn after forking. -The fork boots idle. Use the existing 'send' tool to dispatch a follow-up prompt to it; the fork will run, finish, and you'll wake with the response just like with spawn_worker. +Prefer supplying an 'initialMessage' so the fork is dispatched immediately in a single call — no follow-up 'send' needed. If you omit it, the fork boots idle and you'll need to call 'send' afterwards. For chat-rendered messages use the shape \`{ "text": "..." }\` so the prompt shows up in the chat UI. Use this to explore multiple alternative continuations in parallel from the same starting point. End your current turn first so the fork includes your latest response — the anchor is always the most recently completed run. @@ -20,18 +20,38 @@ Omit 'entityUrl' to fork your own session. Pass a different session's URL to for description: `URL of the session to fork. Omit to fork your own session.`, }) ), + initialMessage: Type.Optional( + Type.Any({ + description: `Initial inbox message delivered to the fork atomically with creation — the fork wakes and starts running immediately. Use the shape \`{ "text": "..." }\` for chat-rendered prompts. Omit to leave the fork idle (then call 'send' separately).`, + }) + ), + tags: Type.Optional( + Type.Record(Type.String(), Type.String(), { + description: `Optional tags stamped on the new fork, on top of those copied from the source. Useful for labelling experiments (e.g. \`{ "experiment": "ecosystem-maturity" }\`).`, + }) + ), }), execute: async (_toolCallId, params) => { - const { entityUrl } = params as { entityUrl?: string } + const { entityUrl, initialMessage, tags } = params as { + entityUrl?: string + initialMessage?: unknown + tags?: Record + } try { - const { url } = await ctx.fork( - entityUrl !== undefined ? { targetEntityUrl: entityUrl } : undefined - ) + const { url } = await ctx.fork({ + ...(entityUrl !== undefined && { targetEntityUrl: entityUrl }), + ...(initialMessage !== undefined && { initialMessage }), + ...(tags !== undefined && { tags }), + }) + const dispatchNote = + initialMessage !== undefined + ? `The initial message has been delivered to the fork — it will start running.` + : `The fork boots idle — use the 'send' tool to dispatch a follow-up prompt.` return { content: [ { type: `text` as const, - text: `Forked at ${url}. The fork is your child and boots idle — use the 'send' tool to dispatch a follow-up prompt. End your turn afterwards; you'll wake with the fork's response when its next run finishes (same as a spawned worker).`, + text: `Forked at ${url}. ${dispatchNote} End your turn; you'll wake with the fork's response when its next run finishes (same as a spawned worker).`, }, ], details: { forked: true, forkUrl: url }, From c700742ada154fc75990038f0bdaae36aa0c5d17 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 12:06:57 +0200 Subject: [PATCH 05/10] test(fork): cover the anchor path + wake-rollback + route forwarding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses Sam's test-coverage ask on the PR. Four focused tests for the fork-tool surface this PR added: - `anchor 'latest_completed_run' resolves to a fork pointer at the latest completed run` — feeds a stream of events covering an idle → started → completed → started cycle and asserts the pointer fed to streamClient.fork is `{ offset: , subOffset: 1 }` on the completed-row entry, matching the runtime's onBeforeBatch pointer-minting convention. - `anchor 'latest_completed_run' rejects when the source has no completed run` — same setup but with only `started`/no `completed` rows. Expects a 400 with a "no completed run" message. - `rolls back the fork when wake registration fails` — wakeRegistry .register throws, forkSubtree rejects, and the rollback path deletes the newly-created root fork entity + its `main`/`error` streams. The source root is left intact. - `forwards anchor, parent, wake, initialMessage, and tags through to forkSubtree (and sends the initial message)` — HTTP route test asserting (a) the new body fields land in the right places in forkSubtree's options, (b) `initialMessage` is NOT passed into forkSubtree but instead delivered via `entityManager.send` after the dispatch link, mirroring spawn's pattern. - (Bonus) `rejects when fork_pointer and anchor are both present` — the mutual-exclusion validation also gets coverage. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test/electric-agents-routes.test.ts | 89 ++++++ .../test/electric-agents-status.test.ts | 257 ++++++++++++++++++ 2 files changed, 346 insertions(+) diff --git a/packages/agents-server/test/electric-agents-routes.test.ts b/packages/agents-server/test/electric-agents-routes.test.ts index 608ab648a3..de43a1df86 100644 --- a/packages/agents-server/test/electric-agents-routes.test.ts +++ b/packages/agents-server/test/electric-agents-routes.test.ts @@ -997,4 +997,93 @@ describe(`ElectricAgentsRoutes fork endpoint`, () => { expect(payload.root).not.toHaveProperty(`write_token`) expect(payload.root).not.toHaveProperty(`subscription_id`) }) + + it(`forwards anchor, parent, wake, initialMessage, and tags through to forkSubtree (and sends the initial message)`, async () => { + const forkedRoot = { + url: `/chat/root-copy`, + type: `chat`, + status: `idle`, + streams: { + main: `/chat/root-copy/main`, + error: `/chat/root-copy/error`, + }, + subscription_id: `chat-handler`, + write_token: `secret-token`, + tags: { experiment: `ecosystem-maturity` }, + spawn_args: {}, + created_at: 1, + updated_at: 1, + } + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/root` }), + getEntityType: vi.fn(), + }, + forkSubtree: vi.fn().mockResolvedValue({ + root: forkedRoot, + entities: [forkedRoot], + }), + send: vi.fn().mockResolvedValue(undefined), + } as any + + const wake = { + subscriberUrl: `/chat/parent`, + condition: `runFinished` as const, + includeResponse: true, + } + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + parent: `/chat/parent`, + wake, + initialMessage: { text: `hello fork` }, + tags: { experiment: `ecosystem-maturity` }, + } + ) + + expect(response.status).toBe(201) + expect(manager.forkSubtree).toHaveBeenCalledWith(`/chat/root`, { + rootInstanceId: undefined, + waitTimeoutMs: undefined, + anchor: `latest_completed_run`, + parent: `/chat/parent`, + wake, + tags: { experiment: `ecosystem-maturity` }, + }) + // initialMessage is NOT passed into forkSubtree — it's delivered + // via entityManager.send after linkEntityDispatchSubscription, the + // same ordering spawn uses. Verify the send happened against the + // new root fork with the parent as `from`. + expect(manager.send).toHaveBeenCalledWith(`/chat/root-copy`, { + from: `/chat/parent`, + payload: { text: `hello fork` }, + }) + }) + + it(`rejects when fork_pointer and anchor are both present`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/root` }), + getEntityType: vi.fn(), + }, + forkSubtree: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + fork_pointer: { offset: `abc`, sub_offset: 1 }, + } + ) + + expect(response.status).toBe(400) + expect(manager.forkSubtree).not.toHaveBeenCalled() + }) }) diff --git a/packages/agents-server/test/electric-agents-status.test.ts b/packages/agents-server/test/electric-agents-status.test.ts index 37ddf60c2f..582105f241 100644 --- a/packages/agents-server/test/electric-agents-status.test.ts +++ b/packages/agents-server/test/electric-agents-status.test.ts @@ -688,4 +688,261 @@ describe(`ElectricAgentsManager.forkSubtree`, () => { expect.any(Uint8Array) ) }) + + // Helper for the anchor / wake-rollback tests: builds a streamClient + // backed by a caller-supplied event list, captures every `fork` call's + // opts so the test can assert the resolved pointer, and tracks + // appends + deletes for rollback assertions. + function makeStreamClient(events: Array>) { + const forkCalls: Array<{ + forkPath: string + sourcePath: string + opts?: Record + }> = [] + const deleted: Array = [] + return { + forkCalls, + deleted, + client: { + readJson: vi.fn().mockResolvedValue(events), + exists: vi.fn().mockResolvedValue(false), + fork: vi.fn( + async ( + forkPath: string, + sourcePath: string, + opts?: Record + ) => { + forkCalls.push({ forkPath, sourcePath, opts }) + } + ), + append: vi.fn().mockResolvedValue({ offset: `1` }), + delete: vi.fn(async (path: string) => { + deleted.push(path) + }), + }, + } + } + + it(`anchor 'latest_completed_run' resolves to a fork pointer at the latest completed run`, async () => { + const root = makeEntity(`/chat/anchor-src`) + // Stream layout (one event per log entry, offsets monotonically + // increasing). The runtime mints pointers as + // { offset: , subOffset: <1-based position> } + // So the latest completed run is at offset "D" (run-0 update) and + // the expected pointer is { offset: "C", subOffset: 1 } — "C" is + // the entry-offset of the row before the target (the started-run + // row), and subOffset is 1 since it's the only row in entry "D". + const events = [ + { + type: `entity_created`, + key: `entity-created`, + headers: { operation: `insert`, offset: `A` }, + value: {}, + }, + { + type: `inbox`, + key: `msg-1`, + headers: { operation: `insert`, offset: `B` }, + value: { from: `user`, payload: `hello` }, + }, + { + type: `run`, + key: `run-0`, + headers: { operation: `insert`, offset: `C` }, + value: { status: `started` }, + }, + { + type: `run`, + key: `run-0`, + headers: { operation: `update`, offset: `D` }, + value: { status: `completed` }, + }, + // Subsequent activity past the latest-completed anchor — the + // resolver should still point at the row at offset "D". + { + type: `inbox`, + key: `msg-2`, + headers: { operation: `insert`, offset: `E` }, + value: { from: `user`, payload: `again` }, + }, + { + type: `run`, + key: `run-1`, + headers: { operation: `insert`, offset: `F` }, + value: { status: `started` }, + }, + ] + const { forkCalls, client } = makeStreamClient(events) + const entitiesByUrl = new Map([[root.url, root]]) + + const manager = new EntityManager({ + registry: { + getEntity: vi.fn(async (url: string) => entitiesByUrl.get(url) ?? null), + listEntities: vi.fn().mockResolvedValue({ entities: [], total: 0 }), + createEntity: vi.fn(async (entity: any) => { + entitiesByUrl.set(entity.url, entity) + return 1 + }), + deleteEntity: vi.fn(), + replaceEntityManifestSource: vi.fn(), + } as any, + streamClient: client as any, + validator: {} as any, + wakeRegistry: { + register: vi.fn(), + unregisterBySubscriber: vi.fn(), + unregisterBySource: vi.fn(), + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + }) + + await manager.forkSubtree(root.url, { + rootInstanceId: `anchor-fork`, + anchor: `latest_completed_run`, + waitTimeoutMs: 0, + }) + + const rootForkCall = forkCalls.find( + (call) => call.sourcePath === root.streams.main + ) + expect(rootForkCall).toBeDefined() + expect(rootForkCall?.opts).toEqual({ + forkPointer: { offset: `C`, subOffset: 1 }, + }) + }) + + it(`anchor 'latest_completed_run' rejects when the source has no completed run`, async () => { + const root = makeEntity(`/chat/no-completion`) + const events = [ + { + type: `entity_created`, + key: `entity-created`, + headers: { operation: `insert`, offset: `A` }, + value: {}, + }, + { + type: `inbox`, + key: `msg-1`, + headers: { operation: `insert`, offset: `B` }, + value: { from: `user`, payload: `hi` }, + }, + // Only `started` — no `completed` row anywhere. + { + type: `run`, + key: `run-0`, + headers: { operation: `insert`, offset: `C` }, + value: { status: `started` }, + }, + ] + const { client } = makeStreamClient(events) + const entitiesByUrl = new Map([[root.url, root]]) + + const manager = new EntityManager({ + registry: { + getEntity: vi.fn(async (url: string) => entitiesByUrl.get(url) ?? null), + listEntities: vi.fn().mockResolvedValue({ entities: [], total: 0 }), + createEntity: vi.fn(), + deleteEntity: vi.fn(), + replaceEntityManifestSource: vi.fn(), + } as any, + streamClient: client as any, + validator: {} as any, + wakeRegistry: { + register: vi.fn(), + unregisterBySubscriber: vi.fn(), + unregisterBySource: vi.fn(), + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + }) + + await expect( + manager.forkSubtree(root.url, { + rootInstanceId: `no-anchor-fork`, + anchor: `latest_completed_run`, + waitTimeoutMs: 0, + }) + ).rejects.toMatchObject({ + status: 400, + message: expect.stringContaining(`no completed run`), + }) + }) + + it(`rolls back the fork when wake registration fails`, async () => { + const root = makeEntity(`/chat/rollback-src`) + const events = [ + { + type: `entity_created`, + key: `entity-created`, + headers: { operation: `insert`, offset: `A` }, + value: {}, + }, + { + type: `run`, + key: `run-0`, + headers: { operation: `insert`, offset: `B` }, + value: { status: `started` }, + }, + { + type: `run`, + key: `run-0`, + headers: { operation: `update`, offset: `C` }, + value: { status: `completed` }, + }, + ] + const { client, deleted } = makeStreamClient(events) + const entitiesByUrl = new Map([[root.url, root]]) + const deletedEntities: Array = [] + + const manager = new EntityManager({ + registry: { + getEntity: vi.fn(async (url: string) => entitiesByUrl.get(url) ?? null), + listEntities: vi.fn().mockResolvedValue({ entities: [], total: 0 }), + createEntity: vi.fn(async (entity: any) => { + entitiesByUrl.set(entity.url, entity) + return 1 + }), + deleteEntity: vi.fn(async (url: string) => { + deletedEntities.push(url) + entitiesByUrl.delete(url) + }), + replaceEntityManifestSource: vi.fn(), + } as any, + streamClient: client as any, + validator: {} as any, + wakeRegistry: { + register: vi.fn().mockRejectedValue(new Error(`wake register boom`)), + unregisterBySubscriber: vi.fn(), + unregisterBySource: vi.fn(), + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + }) + + await expect( + manager.forkSubtree(root.url, { + rootInstanceId: `rollback-fork`, + anchor: `latest_completed_run`, + waitTimeoutMs: 0, + parent: `/chat/parent`, + wake: { + subscriberUrl: `/chat/parent`, + condition: `runFinished`, + includeResponse: true, + }, + }) + ).rejects.toThrow(`wake register boom`) + + // The new root fork entity was created then deleted; both of its + // streams were forked then deleted; the source root is untouched. + expect(deletedEntities).toContain(`/chat/rollback-fork`) + expect(deleted).toEqual( + expect.arrayContaining([ + `/chat/rollback-fork/main`, + `/chat/rollback-fork/error`, + ]) + ) + expect(deletedEntities).not.toContain(root.url) + }) }) From 0cd6b23d2ed3431c6c9aa40eb776361c329e68f0 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 12:49:11 +0200 Subject: [PATCH 06/10] test(rebase): align pre-existing test mocks with main's permission + registry changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After rebasing on top of main, two main-side changes that landed since this branch's base broke a handful of tests that were green at branch time. Aligning the mocks so CI doesn't false-alarm on this PR. 1. `add permission enforcement` (#4475) made the spawn route 401 for any non-bypass principal that doesn't hold the required entity-type grant. The runtime-dsl test framework was using `system:runtime-dsl-test`, which isn't a built-in bypass — switch to `system:dev-local` (the same bypass principal the desktop's local runtime uses). 70 snapshots auto-regenerated; the diff is purely the `from` field, nothing semantic. The dispatch-policy- routing test setup gets the same swap, plus its mocked runner's `owner_principal` flipped to match (so the `assertDispatchPolicyAllowed` owner check passes). 2. A new `registry.replaceSharedStateLink` call landed in `syncManifestLinks` without updating the registry mocks in agents-server's status / write-validation / server-start tests. Added `replaceSharedStateLink: vi.fn()` (or a no-op method on the fake registry class) to each. None of these tests are exercised by the recent main commits' CI matrix (TS tests are scoped by affected workspace, and main has only touched agents-desktop / agents-mobile recently), which is why they appear to "pass on main" but red on this PR's CI. The fixes above keep them passing and let our actual change get a clean signal. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test/dispatch-policy-routing.test.ts | 19 ++++++++++++------- .../test/electric-agents-status.test.ts | 3 +++ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/packages/agents-server/test/dispatch-policy-routing.test.ts b/packages/agents-server/test/dispatch-policy-routing.test.ts index 21792b7c5b..0aab3f4fd1 100644 --- a/packages/agents-server/test/dispatch-policy-routing.test.ts +++ b/packages/agents-server/test/dispatch-policy-routing.test.ts @@ -43,11 +43,16 @@ function buildContext(overrides: Partial = {}): TenantContext { } return { service: `tenant-test`, + // dev-local built-in system principal bypasses permission checks + // (these tests assert dispatch wiring, not authz; permission + // enforcement landed on main after this test was last touched). + // The runner mock below has its owner_principal set to the same + // url so assertDispatchPolicyAllowed's owner check passes too. principal: { - kind: `user`, - id: `owner@example.com`, - key: `user:owner@example.com`, - url: `/principal/user%3Aowner%40example.com`, + kind: `system`, + id: `dev-local`, + key: `system:dev-local`, + url: `/principal/system:dev-local`, }, publicUrl: `http://server`, durableStreamsUrl: `http://durable.local`, @@ -72,7 +77,7 @@ function buildContext(overrides: Partial = {}): TenantContext { ), getRunner: vi.fn(async () => ({ id: `runner-1`, - owner_principal: `/principal/user%3Aowner%40example.com`, + owner_principal: `/principal/system:dev-local`, label: `Local runner`, kind: `local`, admin_status: `enabled`, @@ -212,7 +217,7 @@ describe(`dispatch policy routing`, () => { }) ) expect(ctx.entityManager.send).toHaveBeenCalledWith(`/chat/one`, { - from: `/principal/user%3Aowner%40example.com`, + from: `/principal/system:dev-local`, payload: `hello`, }) expect(ctx.streamClient.putSubscription).toHaveBeenCalledWith( @@ -252,7 +257,7 @@ describe(`dispatch policy routing`, () => { }) ) expect(ctx.entityManager.send).toHaveBeenCalledWith(`/chat/one`, { - from: `/principal/user%3Aowner%40example.com`, + from: `/principal/system:dev-local`, payload: `hello`, }) expect( diff --git a/packages/agents-server/test/electric-agents-status.test.ts b/packages/agents-server/test/electric-agents-status.test.ts index 582105f241..8cdc19a181 100644 --- a/packages/agents-server/test/electric-agents-status.test.ts +++ b/packages/agents-server/test/electric-agents-status.test.ts @@ -785,6 +785,7 @@ describe(`ElectricAgentsManager.forkSubtree`, () => { }), deleteEntity: vi.fn(), replaceEntityManifestSource: vi.fn(), + replaceSharedStateLink: vi.fn(), } as any, streamClient: client as any, validator: {} as any, @@ -845,6 +846,7 @@ describe(`ElectricAgentsManager.forkSubtree`, () => { createEntity: vi.fn(), deleteEntity: vi.fn(), replaceEntityManifestSource: vi.fn(), + replaceSharedStateLink: vi.fn(), } as any, streamClient: client as any, validator: {} as any, @@ -908,6 +910,7 @@ describe(`ElectricAgentsManager.forkSubtree`, () => { entitiesByUrl.delete(url) }), replaceEntityManifestSource: vi.fn(), + replaceSharedStateLink: vi.fn(), } as any, streamClient: client as any, validator: {} as any, From 99e23d5a0d811116d443448d70ad75d5ef4bd861 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 13:11:57 +0200 Subject: [PATCH 07/10] test(rebase): bypass permission check in event-source subscription route tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same pattern as the previous rebase-fixup commit: another test file landed on main with a non-bypass principal (`system:test`), and the new permission-enforcement code (#4475) now reaches for `registry.hasEntityPermission` via canAccessEntity — which the test's mock registry doesn't expose. Switching the principal to `system:dev-local` (a built-in bypass) sidesteps the entity-permission check, same way the desktop's local runtime and the dispatch-policy-routing tests do. These tests assert subscription routing, not authz, so the bypass is the minimal fix and matches the convention established in the previous commit on this branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test/event-source-subscriptions-route.test.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/agents-server/test/event-source-subscriptions-route.test.ts b/packages/agents-server/test/event-source-subscriptions-route.test.ts index d89899fda1..7a477f90f3 100644 --- a/packages/agents-server/test/event-source-subscriptions-route.test.ts +++ b/packages/agents-server/test/event-source-subscriptions-route.test.ts @@ -178,11 +178,16 @@ function tenantContext( } return { service: `svc-agent-1`, + // dev-local is a built-in bypass principal (permissions.ts / + // isBuiltInSystemPrincipalUrl). These tests assert subscription + // routing, not authz — and permission enforcement landed on main + // after they were written, so the registry mocks here don't have + // the entity-permission methods spawned. Bypass is the minimal fix. principal: { kind: `system`, - id: `test`, - key: `system:test`, - url: `/principal/system%3Atest`, + id: `dev-local`, + key: `system:dev-local`, + url: `/principal/system:dev-local`, }, publicUrl: `http://agents.test`, durableStreamsUrl: `http://streams.test/v1/stream/svc-agent-1`, From 052c6bc076805ebf1dd829eb0f1e88302f22b609 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 16:21:49 +0200 Subject: [PATCH 08/10] refactor(fork): split ctx.fork(opts) into fork(url, id, opts?) + forkSelf(id, opts?) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Aligns ctx.fork with ctx.spawn's positional shape: ctx.spawn(type, id, args?, opts?) ctx.fork(sourceEntityUrl, id, opts?) ctx.forkSelf(id, opts?) // convenience for sourceEntityUrl=self Both fork methods now require `id` (mirroring spawn). `id` maps to the server's `instance_id` body field and gives callers two things spawn already provides: - Idempotency on retry: same id → server-side deduplication (no duplicate forks on a transient network error). - Predictable URLs: caller knows `/horton/` before the request returns; no need to wait for the response to learn the new URL. The model-facing surface stays simple: the `fork` tool's `id` param remains optional and is auto-generated via `nanoid(10)` when the model doesn't supply one — same pattern createSpawnWorkerTool uses for the worker id. So the model still calls `fork({ initialMessage: ... })` without ceremony; the tool always satisfies the library API's id requirement. Also introduces `ForkOptions` (initialMessage, wake, tags, observe) as a named exported type, mirroring how spawn's opts bag looks. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agents-runtime/src/context-factory.ts | 38 +++++------ packages/agents-runtime/src/process-wake.ts | 27 ++++---- .../src/runtime-server-client.ts | 5 ++ packages/agents-runtime/src/setup-context.ts | 6 ++ packages/agents-runtime/src/types.ts | 64 +++++++++++++++---- packages/agents/src/tools/fork.ts | 26 ++++++-- 6 files changed, 109 insertions(+), 57 deletions(-) diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index bbbc2d459a..ee926359c0 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -29,6 +29,7 @@ import type { EntitySignal, EntityHandle, EntityStreamDBWithActions, + ForkOptions, HandlerContext, LLMContentBlock, LLMMessage, @@ -102,13 +103,11 @@ export interface HandlerContextConfig { observe?: boolean } ) => Promise - doFork: (opts: { - targetEntityUrl: string - initialMessage?: unknown - wake?: Wake - tags?: Record - observe?: boolean - }) => Promise<{ url: string }> + doFork: ( + sourceEntityUrl: string, + id: string, + opts: ForkOptions + ) => Promise<{ url: string }> doMkdb: ( id: string, schema: TSchema @@ -765,22 +764,15 @@ export function createHandlerContext( ): Promise { return config.doSpawn(type, id, args, opts) }, - fork(opts?: { - targetEntityUrl?: string - initialMessage?: unknown - wake?: Wake - tags?: Record - observe?: boolean - }): Promise<{ url: string }> { - return config.doFork({ - targetEntityUrl: opts?.targetEntityUrl ?? config.entityUrl, - ...(opts?.initialMessage !== undefined && { - initialMessage: opts.initialMessage, - }), - ...(opts?.wake !== undefined && { wake: opts.wake }), - ...(opts?.tags !== undefined && { tags: opts.tags }), - ...(opts?.observe !== undefined && { observe: opts.observe }), - }) + fork( + sourceEntityUrl: string, + id: string, + opts?: ForkOptions + ): Promise<{ url: string }> { + return config.doFork(sourceEntityUrl, id, opts ?? {}) + }, + forkSelf(id: string, opts?: ForkOptions): Promise<{ url: string }> { + return config.doFork(config.entityUrl, id, opts ?? {}) }, mkdb( id: string, diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index 54e959a072..d63442eb98 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -37,6 +37,7 @@ import type { ObservationHandle, ObservationSource, ProcessWakeConfig, + ForkOptions, SendResult, SharedStateSchemaMap, SpawnSandboxOption, @@ -1335,6 +1336,9 @@ export async function processWake( ): Promise<{ entityUrl: string; streamPath: string }> => { const result = await serverClient.forkEntity({ sourceEntityUrl, + ...(opts?.instanceId !== undefined && { + instanceId: opts.instanceId, + }), ...(opts?.parent !== undefined && { parent: opts.parent }), ...(opts?.wake !== undefined && { wake: opts.wake }), ...(opts?.initialMessage !== undefined && { @@ -1766,20 +1770,12 @@ export async function processWake( return setupCtx.spawn(type, id, spawnArgs, opts) } - const doFork = async (opts: { - targetEntityUrl: string - initialMessage?: unknown - wake?: Wake - tags?: Record - observe?: boolean - }): Promise<{ url: string }> => { - const { - targetEntityUrl, - initialMessage, - wake: callerWake, - tags, - observe = true, - } = opts + const doFork = async ( + sourceEntityUrl: string, + id: string, + opts: ForkOptions + ): Promise<{ url: string }> => { + const { initialMessage, wake: callerWake, tags, observe = true } = opts // Child-fork (default `observe: true`): the new fork is created // with `parent = me`, and a `runFinished + includeResponse` wake // is registered on it server-side, mirroring the spawn flow. @@ -1798,8 +1794,9 @@ export async function processWake( includeResponse: true, }) const { entityUrl: forkUrl } = await wiringConfig.forkEntity( - targetEntityUrl, + sourceEntityUrl, { + instanceId: id, ...(observe && { parent: entityUrl }), ...(wakeForRegistration && { wake: wakeForRegistration }), ...(initialMessage !== undefined && { initialMessage }), diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index 98590732b1..702d15adae 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -138,6 +138,8 @@ export interface RuntimeServerClient { */ forkEntity: (options: { sourceEntityUrl: string + /** Maps to the server's `instance_id` body field. */ + instanceId?: string parent?: string wake?: { subscriberUrl: string @@ -480,12 +482,14 @@ export function createRuntimeServerClient( const forkEntity = async ({ sourceEntityUrl, + instanceId, parent, wake, initialMessage, tags, }: { sourceEntityUrl: string + instanceId?: string parent?: string wake?: { subscriberUrl: string @@ -501,6 +505,7 @@ export function createRuntimeServerClient( const body: Record = { anchor: `latest_completed_run`, } + if (instanceId !== undefined) body.instance_id = instanceId if (parent !== undefined) body.parent = parent if (wake !== undefined) body.wake = wake if (initialMessage !== undefined) body.initialMessage = initialMessage diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index 70a6b63833..a2a8b295c4 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -83,6 +83,12 @@ export interface WiringConfig { forkEntity: ( sourceEntityUrl: string, opts?: { + /** + * Caller-supplied instance id for the new fork (wired to the + * server's `instance_id` body field). Omit to let the server + * mint one (currently `-fork-`). + */ + instanceId?: string parent?: string wake?: { subscriberUrl: string diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 2f0f515327..22e00542f5 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -829,6 +829,28 @@ export type Wake = timeoutMs?: number } +/** + * Options bag for `ctx.fork` / `ctx.forkSelf`. Mirrors `ctx.spawn`'s + * opts shape where the semantics map. + */ +export interface ForkOptions { + /** Inbox message delivered to the new fork atomically with creation. */ + initialMessage?: unknown + /** + * Wake subscription registered on the new fork. Defaults to + * `{ on: 'runFinished', includeResponse: true }` when omitted. + */ + wake?: Wake + /** Tags stamped on the new fork in addition to those copied from the source. */ + tags?: Record + /** + * `false` opts out of the parent relationship entirely (no parent + * link, no wake registration, no manifest entry). Fire-and-forget. + * Mirrors `spawn`'s `observe: false`. Defaults to `true`. + */ + observe?: boolean +} + export type WakeMessage = Omit export type WakeEvent = { @@ -989,13 +1011,23 @@ export interface HandlerContext< ) => Promise /** * Fork a session at the latest completed run on its `main` stream. - * Defaults to this entity (self-fork) when `opts.targetEntityUrl` - * is omitted. The new fork is created as a CHILD of this entity - * (same parent-ownership model as `spawn`), and a `runFinished + - * includeResponse` wake is registered on it at fork time. Reply - * delivery uses the parent's manifest-anchored wake — the same - * mechanism `spawn` uses — so when the fork's next run finishes, - * this entity wakes with the response in the wake message. + * The new fork is created as a CHILD of this entity (same parent- + * ownership model as `spawn`), and a `runFinished + includeResponse` + * wake is registered on it at fork time. Reply delivery uses the + * parent's manifest-anchored wake — the same mechanism `spawn` uses + * — so when the fork's next run finishes, this entity wakes with the + * response in the wake message. + * + * Positional shape matches `ctx.spawn(type, id, args?, opts?)`: + * `ctx.fork(sourceEntityUrl, id, opts?)` + * + * - `sourceEntityUrl` — the entity whose history the fork inherits. + * Use `ctx.forkSelf(...)` to fork yourself (omits this arg). + * - `id` — the new fork's instance id (the `` in `/horton/`). + * Required for the same reason `spawn`'s id is required: the caller + * knows the new URL up front and the request is idempotent on retry + * (same id → server deduplicates). Tool wrappers that target the + * model layer generate this via `nanoid` — see `createForkTool`. * * Options mirror `spawn` where the semantics map: * - `initialMessage` is delivered to the fork's inbox atomically @@ -1006,13 +1038,17 @@ export interface HandlerContext< * - `observe: false` opts out of the parent relationship entirely: * no wake, no manifest entry, no reply path — fire-and-forget. */ - fork: (opts?: { - targetEntityUrl?: string - initialMessage?: unknown - wake?: Wake - tags?: Record - observe?: boolean - }) => Promise<{ url: string }> + fork: ( + sourceEntityUrl: string, + id: string, + opts?: ForkOptions + ) => Promise<{ url: string }> + /** + * Convenience wrapper for the common self-fork case — equivalent to + * `ctx.fork(ctx.entityUrl, id, opts)`. Mirrors `spawn`'s ergonomics + * (where you also don't have to spell out who the parent is). + */ + forkSelf: (id: string, opts?: ForkOptions) => Promise<{ url: string }> observe: (( source: ObservationSource & { sourceType: `entity` }, opts?: { wake?: Wake } diff --git a/packages/agents/src/tools/fork.ts b/packages/agents/src/tools/fork.ts index c8c9f42602..e001bf66d7 100644 --- a/packages/agents/src/tools/fork.ts +++ b/packages/agents/src/tools/fork.ts @@ -1,4 +1,5 @@ import { Type } from '@sinclair/typebox' +import { nanoid } from 'nanoid' import { serverLog } from '../log' import type { AgentTool } from '@mariozechner/pi-agent-core' import type { HandlerContext } from '@electric-ax/agents-runtime' @@ -13,13 +14,18 @@ Prefer supplying an 'initialMessage' so the fork is dispatched immediately in a Use this to explore multiple alternative continuations in parallel from the same starting point. End your current turn first so the fork includes your latest response — the anchor is always the most recently completed run. -Omit 'entityUrl' to fork your own session. Pass a different session's URL to fork that session instead (the new fork is still your child).`, +Omit 'entityUrl' to fork your own session. Pass a different session's URL to fork that session instead (the new fork is still your child). The optional 'id' names the new fork's instance — useful when you want stable, predictable URLs (e.g. labelling branches in a parallel exploration); omit to let the server mint one.`, parameters: Type.Object({ entityUrl: Type.Optional( Type.String({ description: `URL of the session to fork. Omit to fork your own session.`, }) ), + id: Type.Optional( + Type.String({ + description: `Instance id for the new fork (the \`\` in \`/horton/\`). Mirrors spawn_worker's id parameter. Omit to let the server assign one.`, + }) + ), initialMessage: Type.Optional( Type.Any({ description: `Initial inbox message delivered to the fork atomically with creation — the fork wakes and starts running immediately. Use the shape \`{ "text": "..." }\` for chat-rendered prompts. Omit to leave the fork idle (then call 'send' separately).`, @@ -32,17 +38,27 @@ Omit 'entityUrl' to fork your own session. Pass a different session's URL to for ), }), execute: async (_toolCallId, params) => { - const { entityUrl, initialMessage, tags } = params as { + const { entityUrl, id, initialMessage, tags } = params as { entityUrl?: string + id?: string initialMessage?: unknown tags?: Record } try { - const { url } = await ctx.fork({ - ...(entityUrl !== undefined && { targetEntityUrl: entityUrl }), + const opts = { ...(initialMessage !== undefined && { initialMessage }), ...(tags !== undefined && { tags }), - }) + } + // The library API (`ctx.fork` / `ctx.forkSelf`) requires an id + // — same shape as `ctx.spawn(type, id, ...)`. The model layer + // doesn't need to know this; we generate one via nanoid when + // it's not supplied (same pattern `createSpawnWorkerTool` uses + // for the worker's id). + const forkId = id ?? `fork-${nanoid(10)}` + const { url } = + entityUrl !== undefined + ? await ctx.fork(entityUrl, forkId, opts) + : await ctx.forkSelf(forkId, opts) const dispatchNote = initialMessage !== undefined ? `The initial message has been delivered to the fork — it will start running.` From 057223a371f8fbd786da8e22a2ae65da0b856c8b Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 16:22:05 +0200 Subject: [PATCH 09/10] feat(fork-route): validate parent + wake.subscriberUrl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A direct HTTP caller could previously attach a fork under an arbitrary `parent` URL or register a wake firing to an arbitrary subscriber. The agents-runtime always passes its own entity URL for both fields, so this hadn't bitten in practice — but it's a real authz hole on the endpoint itself. Mirroring the spawn route's parent-validation flow: - When `parent` is set, look it up via `registry.getEntity` (404 if it doesn't exist) and run `canAccessEntity(ctx, parent, 'spawn')` (401 if the caller can't attach a child there). - When `wake` is set, `parent` is required (the only sensible target for a fork's wake is its parent — same semantics spawn assumes), and `wake.subscriberUrl` must equal `parent`. This prevents a caller from registering a wake firing to an entity they don't own. If we ever need subscriber-flexibility, this becomes a proper canAccessEntity check on the subscriber URL. Tests cover the three rejection paths: parent not found (404), wake without parent (400), and wake.subscriberUrl mismatch (401). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/routing/entities-router.ts | 42 ++++++++++ .../test/electric-agents-routes.test.ts | 80 +++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index 8b91e5624e..3e0de98246 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -1108,6 +1108,48 @@ async function forkEntity( } const { entityUrl, entity } = requireExistingEntityRoute(request) await assertDispatchPolicyAllowed(ctx, entity.dispatch_policy) + + // Validate `parent` and `wake.subscriberUrl` before forking — mirrors + // the spawn route's parent-validation flow. Without these checks, a + // direct HTTP caller could attach a fork under an arbitrary parent or + // register a wake firing to an arbitrary subscriber. + if (parsed.parent !== undefined) { + const parent = await ctx.entityManager.registry.getEntity(parsed.parent) + if (!parent) { + return apiError(404, ErrCodeNotFound, `Parent entity not found`) + } + if (!(await canAccessEntity(ctx, parent, `spawn`, request as Request))) { + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to spawn children from ${parent.url}` + ) + } + } + if (parsed.wake !== undefined) { + // The only sensible target for a fork's wake is the new fork's + // parent (so the parent gets woken when the fork's run finishes + // — same model as spawn). Require parent + matching subscriber to + // prevent a caller from registering a wake firing to an entity + // they don't own. If subscriber-flexibility ever becomes a real + // use case, replace this with a proper canAccessEntity check on + // the subscriber. + if (parsed.parent === undefined) { + return apiError( + 400, + ErrCodeInvalidRequest, + `wake requires parent (the fork's wake fires to its parent)` + ) + } + if (parsed.wake.subscriberUrl !== parsed.parent) { + return apiError( + 401, + ErrCodeUnauthorized, + `wake.subscriberUrl must match parent` + ) + } + } + const result = await ctx.entityManager.forkSubtree(entityUrl, { rootInstanceId: parsed.instance_id, waitTimeoutMs: parsed.waitTimeoutMs, diff --git a/packages/agents-server/test/electric-agents-routes.test.ts b/packages/agents-server/test/electric-agents-routes.test.ts index de43a1df86..eaafd9092f 100644 --- a/packages/agents-server/test/electric-agents-routes.test.ts +++ b/packages/agents-server/test/electric-agents-routes.test.ts @@ -1086,4 +1086,84 @@ describe(`ElectricAgentsRoutes fork endpoint`, () => { expect(response.status).toBe(400) expect(manager.forkSubtree).not.toHaveBeenCalled() }) + + it(`rejects when parent is set but does not exist`, async () => { + // getEntity returns the source for the source-route lookup but + // null for the parent lookup. Differentiate by URL. + const getEntity = vi.fn(async (url: string) => + url === `/chat/root` ? { url: `/chat/root` } : null + ) + const manager = { + registry: { getEntity, getEntityType: vi.fn() }, + forkSubtree: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + parent: `/chat/missing-parent`, + } + ) + + expect(response.status).toBe(404) + expect(manager.forkSubtree).not.toHaveBeenCalled() + }) + + it(`rejects when wake is set without parent`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/root` }), + getEntityType: vi.fn(), + }, + forkSubtree: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + wake: { + subscriberUrl: `/chat/stranger`, + condition: `runFinished`, + includeResponse: true, + }, + } + ) + + expect(response.status).toBe(400) + expect(manager.forkSubtree).not.toHaveBeenCalled() + }) + + it(`rejects when wake.subscriberUrl does not match parent`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ url: `/chat/root` }), + getEntityType: vi.fn(), + }, + forkSubtree: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/root/fork`, + { + anchor: `latest_completed_run`, + parent: `/chat/parent`, + wake: { + subscriberUrl: `/chat/stranger`, + condition: `runFinished`, + includeResponse: true, + }, + } + ) + + expect(response.status).toBe(401) + expect(manager.forkSubtree).not.toHaveBeenCalled() + }) }) From e7c05a43b839da2cf8d2c7e71716c20dff7aebad Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 4 Jun 2026 16:35:25 +0200 Subject: [PATCH 10/10] feat(fork): return EntityHandle from ctx.fork / ctx.forkSelf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spawn returns `EntityHandle`; fork was returning `{ url: string }`. Now that forks are children with the same parent-ownership model as spawned workers (and they report back via the same manifest-anchored wake), there's no good reason for fork's return type to be poorer than spawn's. Bring them into parity. `setupCtx.fork(sourceUrl, id, opts)` is added alongside `setupCtx.spawn` and mirrors its flow: - Builds an `EntityHandle` with deferred run state. - Registers a spawn handle on `wakeSession` so a `runFinished` wake resolves the handle's `run` promise. - Calls `wiring.forkEntity(...)` to do the server-side fork. - Creates a child `EntityStreamDB` to observe the fork's stream and hooks completed/failed run events back to the same resolver. - Writes a `kind: 'child'` manifest row on this entity's stream (parity with spawn's bookkeeping). - Caches the handle in `observeHandleCache`. `observe: false` falls through the same way spawn's does — no parent, no wake, no manifest, no DB observation; the handle's `.run` / `.text` reject with a clear "opted out" message. `process-wake.ts`'s `doFork` becomes a thin delegate to `setupCtx.fork`, same pattern `doSpawn` uses. The user-facing `Wake` → wakeRegistry-shape translation moves to a `normalizeForkWake` module-level helper that the `wiring.forkEntity` impl calls. Tool surface stays the same — `createForkTool` now uses `handle.entityUrl` to build the response text but the model still just gets a `Forked at .` confirmation; the EntityHandle surface is for programmatic callers. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agents-runtime/src/context-factory.ts | 6 +- packages/agents-runtime/src/process-wake.ts | 129 +++------ packages/agents-runtime/src/setup-context.ts | 272 +++++++++++++++++- packages/agents-runtime/src/types.ts | 4 +- ...time-server-client-update-metadata.test.ts | 2 +- packages/agents/src/tools/fork.ts | 6 +- 6 files changed, 309 insertions(+), 110 deletions(-) diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index ee926359c0..2de3ce3190 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -107,7 +107,7 @@ export interface HandlerContextConfig { sourceEntityUrl: string, id: string, opts: ForkOptions - ) => Promise<{ url: string }> + ) => Promise doMkdb: ( id: string, schema: TSchema @@ -768,10 +768,10 @@ export function createHandlerContext( sourceEntityUrl: string, id: string, opts?: ForkOptions - ): Promise<{ url: string }> { + ): Promise { return config.doFork(sourceEntityUrl, id, opts ?? {}) }, - forkSelf(id: string, opts?: ForkOptions): Promise<{ url: string }> { + forkSelf(id: string, opts?: ForkOptions): Promise { return config.doFork(config.entityUrl, id, opts ?? {}) }, mkdb( diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index d63442eb98..bb46b555d2 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -10,6 +10,7 @@ import { createSetupContext } from './setup-context' import type { WiringConfig } from './setup-context' import { createEntityLogPrefix, runtimeLog } from './log' import { createRuntimeServerClient } from './runtime-server-client' +import type { RuntimeServerClient } from './runtime-server-client' import { unrestrictedSandbox } from './sandbox/unrestricted' import { resolveSandboxIdentity } from './sandbox/identity' import { appendPathToUrl } from './url' @@ -108,6 +109,36 @@ function inboxEventKey(event: ChangeEvent): string { return String(event.key) } +// Translate the user-facing `Wake` into the wakeRegistry-compatible +// shape used by `serverClient.forkEntity`. Same translation +// `createOrGetChild` does inline for spawn (subscriberUrl is fixed by +// the caller — typically the new fork's parent). +type ServerForkWake = NonNullable< + Parameters[0][`wake`] +> +function normalizeForkWake(wake: Wake, subscriberUrl: string): ServerForkWake { + const isRunFinished = + wake === `runFinished` || + (typeof wake === `object` && wake.on === `runFinished`) + const condition = isRunFinished + ? (`runFinished` as const) + : (wake as Exclude) + const result: ServerForkWake = { + subscriberUrl, + condition, + } + if (typeof wake === `object` && wake.on === `runFinished`) { + if (wake.includeResponse !== undefined) { + result.includeResponse = wake.includeResponse + } + } + if (typeof wake === `object` && wake.on === `change`) { + if (wake.debounceMs !== undefined) result.debounceMs = wake.debounceMs + if (wake.timeoutMs !== undefined) result.timeoutMs = wake.timeoutMs + } + return result +} + function toError(err: unknown): Error { return err instanceof Error ? err : new Error(String(err)) } @@ -1334,13 +1365,21 @@ export async function processWake( sourceEntityUrl: string, opts?: Parameters[1] ): Promise<{ entityUrl: string; streamPath: string }> => { + // Normalize the user-facing Wake into the wakeRegistry- + // compatible shape — same translation `createOrGetChild` does + // for spawn. subscriberUrl is derived from `opts.parent` + // (the only valid target after the route's wake validation). + const wakeOpt = + opts?.wake && opts.parent + ? normalizeForkWake(opts.wake, opts.parent) + : undefined const result = await serverClient.forkEntity({ sourceEntityUrl, ...(opts?.instanceId !== undefined && { instanceId: opts.instanceId, }), ...(opts?.parent !== undefined && { parent: opts.parent }), - ...(opts?.wake !== undefined && { wake: opts.wake }), + ...(wakeOpt && { wake: wakeOpt }), ...(opts?.initialMessage !== undefined && { initialMessage: opts.initialMessage, }), @@ -1770,94 +1809,12 @@ export async function processWake( return setupCtx.spawn(type, id, spawnArgs, opts) } - const doFork = async ( + const doFork = ( sourceEntityUrl: string, id: string, opts: ForkOptions - ): Promise<{ url: string }> => { - const { initialMessage, wake: callerWake, tags, observe = true } = opts - // Child-fork (default `observe: true`): the new fork is created - // with `parent = me`, and a `runFinished + includeResponse` wake - // is registered on it server-side, mirroring the spawn flow. - // Reply delivery back to the parent uses the same manifest- - // anchored wake the spawn flow uses. - // - // Fire-and-forget (`observe: false`): no parent link, no wake, - // no manifest entry. The fork exists as a top-level sibling and - // the caller never hears back about it. Mirrors `spawn`'s - // `observe: false` semantics. - const wakeForRegistration = !observe - ? undefined - : (normalizeWake(callerWake) ?? { - subscriberUrl: entityUrl, - condition: `runFinished` as const, - includeResponse: true, - }) - const { entityUrl: forkUrl } = await wiringConfig.forkEntity( - sourceEntityUrl, - { - instanceId: id, - ...(observe && { parent: entityUrl }), - ...(wakeForRegistration && { wake: wakeForRegistration }), - ...(initialMessage !== undefined && { initialMessage }), - ...(tags && { tags }), - } - ) - if (observe) { - const segments = forkUrl.split(`/`).filter(Boolean) - const forkType = segments[0] ?? `unknown` - const forkId = segments[1] ?? forkUrl - const childKey = manifestChildKey(forkType, forkId) - // Mirror the wake into the manifest entry for parity with - // spawn's bookkeeping — the server-side wake registry is the - // authoritative delivery mechanism; this keeps the parent's - // own state machine aware of the relationship across wakes. - const manifestWake = callerWake ?? { - on: `runFinished` as const, - includeResponse: true, - } - wakeSession.registerManifestEntry({ - kind: `child`, - key: childKey, - id: forkId, - entity_type: forkType, - entity_url: forkUrl, - observed: true, - wake: manifestWake, - }) - } - return { url: forkUrl } - } - - // Translate a user-facing `Wake` (literal `runFinished` or - // structured) into the wakeRegistry-compatible shape used by - // `wiring.forkEntity`'s `wake` field. Same translation - // createOrGetChild does for spawn (process-wake.ts:1272-1295). - type ForkWake = NonNullable< - NonNullable[1]>[`wake`] - > - function normalizeWake(wake: Wake | undefined): ForkWake | undefined { - if (wake === undefined) return undefined - const isRunFinished = - wake === `runFinished` || - (typeof wake === `object` && wake.on === `runFinished`) - const condition: ForkWake[`condition`] = isRunFinished - ? `runFinished` - : (wake as Exclude) - const result: ForkWake = { - subscriberUrl: entityUrl, - condition, - } - if (typeof wake === `object` && wake.on === `runFinished`) { - if (wake.includeResponse !== undefined) { - result.includeResponse = wake.includeResponse - } - } - if (typeof wake === `object` && wake.on === `change`) { - if (wake.debounceMs !== undefined) result.debounceMs = wake.debounceMs - if (wake.timeoutMs !== undefined) result.timeoutMs = wake.timeoutMs - } - return result + ): Promise => { + return setupCtx.fork(sourceEntityUrl, id, opts) } const doMkdb = ( diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index a2a8b295c4..4e64c6c633 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -45,7 +45,6 @@ import type { Wake, WakeSession, } from './types' -import type { TagOperation } from './tags' interface EffectScope { register: ( @@ -89,21 +88,18 @@ export interface WiringConfig { * mint one (currently `-fork-`). */ instanceId?: string + /** + * Parent URL for the new fork. When set, the fork becomes a + * child of this URL and the wake registration's subscriberUrl + * is derived from it (matching the spawn route's contract). + */ parent?: string - wake?: { - subscriberUrl: string - condition: - | `runFinished` - | { - on: `change` - collections?: Array - ops?: Array - } - debounceMs?: number - timeoutMs?: number - includeResponse?: boolean - manifestKey?: string - } + /** + * User-facing Wake; the wiring impl normalizes it into the + * wakeRegistry-compatible shape before sending to the server + * — same translation `createOrGetChild` does for spawn. + */ + wake?: Wake initialMessage?: unknown tags?: Record } @@ -171,6 +167,26 @@ export interface SetupContextResult { observe?: boolean } ) => Promise + /** + * Fork a source entity at its latest completed run. Returns an + * EntityHandle for the new fork — same shape spawn returns — so the + * caller can `await fork.run` etc. The fork is created as a CHILD + * of this setup-context's entity (parent = entityUrl) unless + * `observe: false`. Mirrors spawn's flow: builds the handle, wires a + * spawn handle on wakeSession to resolve the run promise on + * runFinished, then calls `wiring.forkEntity` to do the server-side + * fork. + */ + fork: ( + sourceEntityUrl: string, + id: string, + opts?: { + initialMessage?: unknown + wake?: Wake + tags?: Record + observe?: boolean + } + ) => Promise send: ( entityUrl: string, payload: unknown, @@ -1240,6 +1256,232 @@ export function createSetupContext( return handle }, + async fork( + sourceEntityUrl: string, + id: string, + opts?: { + initialMessage?: unknown + wake?: Wake + tags?: Record + observe?: boolean + } + ): Promise { + const observeChild = opts?.observe !== false + // The fork's type is the source's type — fork is a copy, not a + // new entity type. Parse it from the source URL. + const parsedSourceUrl = sourceEntityUrl + .split(`/`) + .filter((segment) => segment.length > 0) + const type = parsedSourceUrl[0] ?? `unknown` + const childKey = manifestChildKey(type, id) + const childRow = (entityUrl: string): ManifestChildEntry => ({ + kind: `child`, + key: childKey, + id, + entity_type: type, + entity_url: entityUrl, + observed: true, + ...(opts?.wake ? { wake: opts.wake } : {}), + }) + + let runResolve: () => void + let forkError: Error | null = null + let runPromise = new Promise((resolve) => { + runResolve = resolve + }) + + // The server constructs the fork URL as `//` when + // `instance_id` is supplied (current code path), so we know it up + // front. Same property spawn relies on for its handle URL. + let realEntityUrl = `/${type}/${id}` + + const handle: EntityHandle = { + sourceType: `entity`, + get sourceRef() { + return realEntityUrl + }, + get entityUrl() { + return realEntityUrl + }, + type, + db: null as unknown as EntityStreamDB, + events: [], + get run(): Promise { + if (inSetup) { + throw new Error( + `fork.run cannot be called during setup() — use it in effects instead` + ) + } + if (!observeChild) { + return Promise.reject( + new Error( + `fork.run is unavailable — fork(..., { observe: false }) opted out of child observation` + ) + ) + } + if (forkError) { + return Promise.reject(forkError) + } + return runPromise + }, + async text(): Promise> { + if (!observeChild) { + throw new Error( + `fork.text is unavailable — fork(..., { observe: false }) opted out of child observation` + ) + } + await this.run + return readCompletedRunTexts(this.db) + }, + send: (msg: unknown) => { + if (inSetup) { + throw new Error( + `fork.send() cannot be called during setup() — use it in effects instead` + ) + } + if (forkError) { + throw forkError + } + const result = dispatchSend({ + targetUrl: realEntityUrl, + payload: msg, + }) + runPromise = new Promise((resolve) => { + runResolve = resolve + }) + return result + }, + status: () => { + const entries = db.collections.childStatus?.toArray as + | Array + | undefined + return entries?.find( + (e) => + e.entity_url === realEntityUrl || + e.entity_url === `/${type}/${id}` + ) + }, + } + + // ---- Inline wiring (production path) ---- + if (wiring) { + // Mirror spawn's order: register the spawn handle first so the + // runFinished wake can find it on the back-edge; then create + // the server-side fork. + wakeSession.registerSpawnHandle(id, { + wireDb: () => {}, + resolveRun: () => { + runResolve!() + }, + rejectRun: (reason: Error) => { + forkError = reason + }, + updateEntityUrl: (newUrl: string) => { + realEntityUrl = newUrl + if (observeChild) { + wakeSession.registerManifestEntry(childRow(newUrl)) + } + }, + }) + + try { + // For an observed (default) fork, register a wake on the + // new fork firing back to this entity. Default to + // `runFinished + includeResponse` when the caller didn't + // pass a wake — matches spawn's default child-observation + // shape. The wiring impl translates user-facing `Wake` into + // the wakeRegistry's `{ subscriberUrl, condition, ... }`. + const wakeForFork: Wake | undefined = observeChild + ? (opts?.wake ?? { + on: `runFinished`, + includeResponse: true, + }) + : undefined + const { entityUrl: forkUrl, streamPath } = await wiring.forkEntity( + sourceEntityUrl, + { + instanceId: id, + ...(observeChild && { parent: entityUrl }), + ...(wakeForFork && { wake: wakeForFork }), + ...(opts?.initialMessage !== undefined && { + initialMessage: opts.initialMessage, + }), + ...(opts?.tags && { tags: opts.tags }), + } + ) + realEntityUrl = forkUrl + if (observeChild) { + wakeSession.registerManifestEntry(childRow(forkUrl)) + + const childDb = await wiring.createChildDb( + `${config.serverBaseUrl}${streamPath}`, + type, + (event) => { + if ( + event.type === `run` && + event.headers.operation === `update` + ) { + const val = event.value as { status?: string } | undefined + if (val?.status === `completed` || val?.status === `failed`) { + runResolve!() + } + } + } + ) + handle.db = childDb + + const runs = childDb.collections.runs?.toArray as + | Array<{ key: string; status: string }> + | undefined + if (runs) { + const latestRun = runs[runs.length - 1] + if ( + latestRun && + (latestRun.status === `completed` || + latestRun.status === `failed`) + ) { + runResolve!() + } + } + } + } catch (err) { + forkError = err instanceof Error ? err : new Error(String(err)) + throw forkError + } + + observeHandleCache.set(realEntityUrl, handle) + + return handle + } + + // ---- Deferred wiring (unit test path) ---- same shape as spawn. + const dbReady = new Promise((resolveDb, rejectDb) => { + wakeSession.registerSpawnHandle(id, { + wireDb: (childDb: EntityStreamDBWithActions) => { + handle.db = childDb + resolveDb() + }, + resolveRun: () => { + runResolve!() + }, + rejectRun: (reason: Error) => { + forkError = reason + rejectDb(reason) + }, + updateEntityUrl: (newUrl: string) => { + realEntityUrl = newUrl + wakeSession.registerManifestEntry(childRow(newUrl)) + }, + }) + }) + + wakeSession.registerManifestEntry(childRow(realEntityUrl)) + + await dbReady + + return handle + }, + send( targetUrl: string, payload: unknown, diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 22e00542f5..b49bb67695 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -1042,13 +1042,13 @@ export interface HandlerContext< sourceEntityUrl: string, id: string, opts?: ForkOptions - ) => Promise<{ url: string }> + ) => Promise /** * Convenience wrapper for the common self-fork case — equivalent to * `ctx.fork(ctx.entityUrl, id, opts)`. Mirrors `spawn`'s ergonomics * (where you also don't have to spell out who the parent is). */ - forkSelf: (id: string, opts?: ForkOptions) => Promise<{ url: string }> + forkSelf: (id: string, opts?: ForkOptions) => Promise observe: (( source: ObservationSource & { sourceType: `entity` }, opts?: { wake?: Wake } diff --git a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts index 2361a47db9..89e3839644 100644 --- a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts +++ b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts @@ -270,7 +270,7 @@ describe(`createHandlerContext: tags + tag mutations`, () => { wakeEvent: { type: `inbox`, payload: `hi` } as any, doObserve: () => Promise.resolve({} as any), doSpawn: () => Promise.resolve({} as any), - doFork: () => Promise.resolve({ url: `/horton/fork` }), + doFork: () => Promise.resolve({} as any), doMkdb: () => ({}) as any, executeSend: async () => ({ sent: true, targetUrl: `/horton/x` }), doSetTag: async (key, value) => { diff --git a/packages/agents/src/tools/fork.ts b/packages/agents/src/tools/fork.ts index e001bf66d7..c1d29dacaa 100644 --- a/packages/agents/src/tools/fork.ts +++ b/packages/agents/src/tools/fork.ts @@ -55,7 +55,7 @@ Omit 'entityUrl' to fork your own session. Pass a different session's URL to for // it's not supplied (same pattern `createSpawnWorkerTool` uses // for the worker's id). const forkId = id ?? `fork-${nanoid(10)}` - const { url } = + const handle = entityUrl !== undefined ? await ctx.fork(entityUrl, forkId, opts) : await ctx.forkSelf(forkId, opts) @@ -67,10 +67,10 @@ Omit 'entityUrl' to fork your own session. Pass a different session's URL to for content: [ { type: `text` as const, - text: `Forked at ${url}. ${dispatchNote} End your turn; you'll wake with the fork's response when its next run finishes (same as a spawned worker).`, + text: `Forked at ${handle.entityUrl}. ${dispatchNote} End your turn; you'll wake with the fork's response when its next run finishes (same as a spawned worker).`, }, ], - details: { forked: true, forkUrl: url }, + details: { forked: true, forkUrl: handle.entityUrl }, } } catch (err) { const message = err instanceof Error ? err.message : `Unknown error`