Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/fork-tool-horton.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@electric-ax/agents': patch
---

Give Horton a `fork` tool that creates a child session inheriting this conversation's history up to the latest completed response. Takes an optional `entityUrl` (omit for self-fork), an optional `initialMessage` (delivered atomically — no follow-up `send` needed), and optional `tags`. The fork is created as a CHILD of the calling entity (same parent-ownership model as `spawn_worker`) and wires reply delivery through the same manifest-anchored wake — when the fork's next run finishes, the parent wakes with the response in the wake message.

Horton's system prompt grows a "When to fork (vs spawn_worker)" section framing the two tools as a pair: both create a child the parent owns and gets replies from, the difference is what the child boots with — `spawn_worker` starts with an empty context (you brief it from scratch), `fork` starts with a copy of the conversation up to the latest completed response. Includes an explicit trigger pattern ("prefer fork when generating multiple variants the user wants to compare; don't inline") to route "give me three takes" / "evaluate these N approaches" prompts to fork rather than collapsing them into one inline response, plus the workflow for the parallel-exploration loop (end-turn-first, fork-once-per-branch with a different `initialMessage` each, wait for all responses before synthesising).
21 changes: 21 additions & 0 deletions .changeset/fork-tool-runtime.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
'@electric-ax/agents-runtime': patch
---

Add `ctx.fork(opts?)` to `HandlerContext`, with an opts shape that mirrors `ctx.spawn`'s where the semantics map:

```ts
ctx.fork(opts?: {
targetEntityUrl?: string // omit for self-fork
initialMessage?: unknown // delivered atomically with creation
wake?: Wake // overrides the default runFinished + includeResponse
tags?: Record<string, string>
observe?: boolean // `false` = fire-and-forget (no parent, no wake, no manifest entry)
})
```

By default (`observe: true`), the new fork is a CHILD of this entity (same parent-ownership model as `ctx.spawn`), and a `runFinished + includeResponse` wake is registered on it server-side. Reply delivery uses the same manifest-anchored wake mechanism `ctx.spawn` uses — when the fork's next run finishes, this entity wakes with the response. `observe: false` opts out of the parent relationship entirely: no parent URL, no wake subscription, no manifest entry on the parent's stream.

Internally writes a `kind: 'child'` manifest row on the parent's stream alongside the server-side wake registration, mirroring the spawn flow's bookkeeping so the relationship persists across wakes. Wired through new fields on `RuntimeServerClient.forkEntity` (`parent`, `wake`, `initialMessage`, `tags`) and `WiringConfig.forkEntity`. A `normalizeWake` helper translates the user-facing `Wake` type into the wakeRegistry-compatible shape, same logic `createOrGetChild` uses for spawn.

The `send` tool's `payload` description now documents the canonical `{ text: "..." }` shape for chat-rendered targets (Horton sessions, agent forks) so messages emitted by `send` render as chat bubbles instead of blank bars.
16 changes: 16 additions & 0 deletions .changeset/fork-tool-server.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
'@electric-ax/agents-server': patch
'@electric-ax/agents-server-ui': patch
---

Add server-resolved fork anchor + spawn-parity body fields to `POST /_electric/entities/<type>/<id>/fork`.

- `anchor: 'latest_completed_run'` is an alternative to `fork_pointer`: the server scans the source root's `main` history, finds the most recent `runs` row with `status === 'completed'`, derives the matching `{ offset, sub_offset }` pointer, and runs the existing pointer-fork path with it. Mutually exclusive with `fork_pointer` (400 if both); 400 if no completed run exists. Lets callers without access to the source's per-row pointer side-table (e.g. an agent forking via a tool) fork at the same anchor the per-row "Fork from here" UI uses.
- `parent` overrides the new root fork's `parent` field, making it a CHILD of that URL (rather than inheriting the source's parent).
- `wake` registers a subscription on the new root fork at fork time (same shape as `spawn`'s `wake`).
- `initialMessage` is delivered to the new root fork via `entityManager.send` after `linkEntityDispatchSubscription` runs — same ordering spawn uses, so the dispatcher is subscribed before the inbox row lands and the fork actually wakes on the message instead of sitting idle.
- `tags` are stamped on the new root fork in addition to those copied from the source.

Together these let an agent fork itself as a child and receive replies via the same manifest-anchored wake mechanism `spawn` uses, with a single round-trip fork-and-dispatch.

Chat UI: `readInboxText` falls back to `message` and `content` keys when `text` isn't present, so messages sent by agents (which sometimes emit those shapes) render as a chat bubble body instead of a blank bar.
16 changes: 16 additions & 0 deletions packages/agents-runtime/src/context-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import type {
EntitySignal,
EntityHandle,
EntityStreamDBWithActions,
ForkOptions,
HandlerContext,
LLMContentBlock,
LLMMessage,
Expand Down Expand Up @@ -102,6 +103,11 @@ export interface HandlerContextConfig<TState extends StateProxy = StateProxy> {
observe?: boolean
}
) => Promise<EntityHandle>
doFork: (
sourceEntityUrl: string,
id: string,
opts: ForkOptions
) => Promise<EntityHandle>
doMkdb: <TSchema extends SharedStateSchemaMap>(
id: string,
schema: TSchema
Expand Down Expand Up @@ -758,6 +764,16 @@ export function createHandlerContext<TState extends StateProxy = StateProxy>(
): Promise<EntityHandle> {
return config.doSpawn(type, id, args, opts)
},
fork(
sourceEntityUrl: string,
id: string,
opts?: ForkOptions
): Promise<EntityHandle> {
return config.doFork(sourceEntityUrl, id, opts ?? {})
},
forkSelf(id: string, opts?: ForkOptions): Promise<EntityHandle> {
return config.doFork(config.entityUrl, id, opts ?? {})
},
mkdb<TSchema extends SharedStateSchemaMap>(
id: string,
schema: TSchema
Expand Down
69 changes: 69 additions & 0 deletions packages/agents-runtime/src/process-wake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import { normalizeObservationSchema } from './observation-schema'
import { createWakeSession } from './wake-session'
import { createHandlerContext } from './context-factory'
import { createSetupContext } from './setup-context'
import type { WiringConfig } from './setup-context'
import { createEntityLogPrefix, runtimeLog } from './log'
import { createRuntimeServerClient } from './runtime-server-client'
import type { RuntimeServerClient } from './runtime-server-client'
import { unrestrictedSandbox } from './sandbox/unrestricted'
import { resolveSandboxIdentity } from './sandbox/identity'
import { appendPathToUrl } from './url'
Expand Down Expand Up @@ -36,6 +38,7 @@ import type {
ObservationHandle,
ObservationSource,
ProcessWakeConfig,
ForkOptions,
SendResult,
SharedStateSchemaMap,
SpawnSandboxOption,
Expand Down Expand Up @@ -106,6 +109,36 @@ function inboxEventKey(event: ChangeEvent): string {
return String(event.key)
}

// Translate the user-facing `Wake` into the wakeRegistry-compatible
// shape used by `serverClient.forkEntity`. Same translation
// `createOrGetChild` does inline for spawn (subscriberUrl is fixed by
// the caller — typically the new fork's parent).
type ServerForkWake = NonNullable<
Parameters<RuntimeServerClient[`forkEntity`]>[0][`wake`]
>
function normalizeForkWake(wake: Wake, subscriberUrl: string): ServerForkWake {
const isRunFinished =
wake === `runFinished` ||
(typeof wake === `object` && wake.on === `runFinished`)
const condition = isRunFinished
? (`runFinished` as const)
: (wake as Exclude<Wake, `runFinished` | { on: `runFinished` }>)
const result: ServerForkWake = {
subscriberUrl,
condition,
}
if (typeof wake === `object` && wake.on === `runFinished`) {
if (wake.includeResponse !== undefined) {
result.includeResponse = wake.includeResponse
}
}
if (typeof wake === `object` && wake.on === `change`) {
if (wake.debounceMs !== undefined) result.debounceMs = wake.debounceMs
if (wake.timeoutMs !== undefined) result.timeoutMs = wake.timeoutMs
}
return result
}

function toError(err: unknown): Error {
return err instanceof Error ? err : new Error(String(err))
}
Expand Down Expand Up @@ -1328,6 +1361,33 @@ export async function processWake(
})
},

forkEntity: async (
sourceEntityUrl: string,
opts?: Parameters<WiringConfig[`forkEntity`]>[1]
): Promise<{ entityUrl: string; streamPath: string }> => {
// Normalize the user-facing Wake into the wakeRegistry-
// compatible shape — same translation `createOrGetChild` does
// for spawn. subscriberUrl is derived from `opts.parent`
// (the only valid target after the route's wake validation).
const wakeOpt =
opts?.wake && opts.parent
? normalizeForkWake(opts.wake, opts.parent)
: undefined
const result = await serverClient.forkEntity({
sourceEntityUrl,
...(opts?.instanceId !== undefined && {
instanceId: opts.instanceId,
}),
...(opts?.parent !== undefined && { parent: opts.parent }),
...(wakeOpt && { wake: wakeOpt }),
...(opts?.initialMessage !== undefined && {
initialMessage: opts.initialMessage,
}),
...(opts?.tags !== undefined && { tags: opts.tags }),
})
return { entityUrl: result.entityUrl, streamPath: result.streamPath }
},

createChildDb: async (
childStreamUrl: string,
childTypeName?: string,
Expand Down Expand Up @@ -1749,6 +1809,14 @@ export async function processWake(
return setupCtx.spawn(type, id, spawnArgs, opts)
}

const doFork = (
sourceEntityUrl: string,
id: string,
opts: ForkOptions
): Promise<EntityHandle> => {
return setupCtx.fork(sourceEntityUrl, id, opts)
}

const doMkdb = <TSchema extends SharedStateSchemaMap>(
id: string,
schema: TSchema
Expand Down Expand Up @@ -2039,6 +2107,7 @@ export async function processWake(
hydratedEventSourceWake: await hydrateCurrentEventSourceWake(),
doObserve,
doSpawn,
doFork,
doMkdb,
doCreateAttachment: (attachment) =>
serverClient
Expand Down
79 changes: 79 additions & 0 deletions packages/agents-runtime/src/runtime-server-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,37 @@ export interface RuntimeServerClient {
id: string
}) => Promise<Uint8Array>
spawnEntity: (options: SpawnEntityOptions) => Promise<RuntimeEntityInfo>
/**
* Fork an entity at the server-resolved `latest_completed_run` anchor.
* Resolves to the new root entity's info. Wraps the agents-server
* `POST /_electric/entities/<type>/<id>/fork` endpoint.
*
* Optional fields mirror `spawnEntity`:
* - `parent` makes the new fork a child of that URL.
* - `wake` registers a subscription at fork time (reply delivery
* uses the parent's manifest-anchored wake when paired with a
* manifest entry on the parent — same model as `spawn`).
* - `initialMessage` delivers an inbox message to the new fork
* atomically with creation, folding fork+send into one round-trip.
* - `tags` stamps tags onto the new fork in addition to those
* copied from the source.
*/
forkEntity: (options: {
sourceEntityUrl: string
/** Maps to the server's `instance_id` body field. */
instanceId?: string
parent?: string
wake?: {
subscriberUrl: string
condition: RegisterWakeOptions[`condition`]
debounceMs?: number
timeoutMs?: number
includeResponse?: boolean
manifestKey?: string
}
initialMessage?: unknown
tags?: Record<string, string>
}) => Promise<RuntimeEntityInfo>
getEntity: (entityUrl: string) => Promise<RuntimeEntityInfo>
ensureSharedStateStream: (
sharedStateId: string,
Expand Down Expand Up @@ -449,6 +480,53 @@ export function createRuntimeServerClient(
return entityInfo
}

const forkEntity = async ({
sourceEntityUrl,
instanceId,
parent,
wake,
initialMessage,
tags,
}: {
sourceEntityUrl: string
instanceId?: string
parent?: string
wake?: {
subscriberUrl: string
condition: RegisterWakeOptions[`condition`]
debounceMs?: number
timeoutMs?: number
includeResponse?: boolean
manifestKey?: string
}
initialMessage?: unknown
tags?: Record<string, string>
}): Promise<RuntimeEntityInfo> => {
const body: Record<string, unknown> = {
anchor: `latest_completed_run`,
}
if (instanceId !== undefined) body.instance_id = instanceId
if (parent !== undefined) body.parent = parent
if (wake !== undefined) body.wake = wake
if (initialMessage !== undefined) body.initialMessage = initialMessage
if (tags !== undefined) body.tags = tags
const response = await request(entityRpcPath(sourceEntityUrl, `/fork`), {
method: `POST`,
headers: { 'content-type': `application/json` },
body: JSON.stringify(body),
})
if (!response.ok) {
throw new Error(
`fork ${sourceEntityUrl} failed (${response.status}): ${await readErrorText(response)}`
)
}
const payload = (await response.json()) as { root?: RuntimeEntityResponse }
return requireEntityInfo(
payload.root,
`fork ${sourceEntityUrl} returned an invalid root payload`
)
}

const ensureSharedStateStream = async (
sharedStateId: string,
ownerEntityUrl?: string
Expand Down Expand Up @@ -778,6 +856,7 @@ export function createRuntimeServerClient(
createAttachment,
readAttachment,
spawnEntity,
forkEntity,
getEntity,
ensureSharedStateStream,
signalEntity,
Expand Down
Loading
Loading