Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions .agents/skills/pi-agent-integration/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---
name: pi-agent-integration
description: Integrate `@mariozechner/pi-agent-core` as the agent abstraction inside another library or runtime. Use when implementing or refactoring Pi Agent wrappers, streaming bridges, `convertToLlm`/`transformContext`, queueing via `steer`/`followUp`, `continue()` semantics, or timeout/abort/session behavior.
---

Implement Pi-agent consumers with stable streaming, correct queue semantics, and minimal wrapper surface area.

## Step 1: Classify the request

Pick the path before editing:

| Request type | Read first |
| --- | --- |
| Wiring or updating agent wrapper APIs/options | `references/api-surface.md` |
| Adding behavior in a consumer library (chat, orchestration, tools) | `references/common-use-cases.md` |
| Debugging broken streaming/tool/continue behavior | `references/troubleshooting-workarounds.md` |

If the task spans multiple categories, load only the relevant files above.

## Step 2: Apply integration guardrails

1. Treat `Agent` as the execution engine and keep wrapper abstractions thin.
2. Stream user-visible text only from `message_update` + `assistantMessageEvent.type === "text_delta"`.
3. Bridge deltas into `AsyncIterable<string>` and pass that iterable to downstream streaming surfaces.
4. Preserve message boundaries when streaming multi-message assistant output (insert separators intentionally, then normalize).
5. Never call `prompt()` or `continue()` while the agent is running; use `steer()`/`followUp()` for mid-run input.
6. Keep `convertToLlm` and `transformContext` explicit, deterministic, and easy to test.
7. Keep tool calls/results as internal execution artifacts unless product UX explicitly requires otherwise.

## Step 3: Implement with minimal surface

1. Prefer constructor options over custom wrapper state machines (`streamFn`, `getApiKey`, `sessionId`, `thinkingBudgets`, `maxRetryDelayMs`).
2. Use `transformContext` for pruning/injection and `convertToLlm` for message-role conversion/filtering.
3. Keep queue mode explicit (`steeringMode`, `followUpMode`) when concurrency/order matters.
4. For server-proxied model access, use `streamFn` with `streamProxy`-style behavior instead of bespoke provider logic in consumers.
5. Keep failure behavior explicit: timeout/abort paths should set observable diagnostics and terminate streaming cleanly.

## Step 4: Verify behavior

1. Verify event-to-stream bridge emits only text deltas and always closes the iterable.
2. Verify `prompt()`/`continue()` race handling (throws while streaming; queue path works via `steer`/`followUp`).
3. Verify `continue()` preconditions: non-empty context and valid last-message role semantics.
4. Verify custom message types survive agent state while `convertToLlm` emits only LLM-compatible roles.
5. Verify tool execution and turn lifecycle events remain internal unless explicitly exposed.
6. Verify newline joining/normalization parity between streamed and finalized outputs.

## Step 5: Migration and version checks

1. Check for queue API migrations (`queueMessage` -> `steer`/`followUp`) before editing old wrappers.
2. Check renamed hooks/options (`messageTransformer` -> `convertToLlm`, `preprocessor` -> `transformContext`).
3. Check default/available options in current package version before adding compatibility shims.
4. Favor hard cutovers unless backward compatibility is explicitly requested.

63 changes: 63 additions & 0 deletions .agents/skills/pi-agent-integration/SOURCES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Sources

Retrieved: 2026-03-05
Skill class: `integration-documentation`
Selected profile: `references/examples/documentation-skill.md`

## Source inventory

| Source | Trust tier | Confidence | Contribution | Usage constraints |
| --- | --- | --- | --- | --- |
| `AGENTS.md` (junior) | canonical | high | Repository conventions and Pi streaming standard (`message_update`/`text_delta` -> `AsyncIterable`) | Repo-local guidance |
| `.agents/skills/skill-writer/SKILL.md` | canonical | high | Required workflow for synthesis/authoring/validation outputs | Skill-authoring process source |
| `.agents/skills/skill-writer/references/mode-selection.md` | canonical | high | Class selection and required outputs | Process guidance |
| `.agents/skills/skill-writer/references/synthesis-path.md` | canonical | high | Provenance, coverage matrix, depth gates | Process guidance |
| `.agents/skills/skill-writer/references/authoring-path.md` | canonical | high | Required artifact set for integration-documentation skills | Process guidance |
| `.agents/skills/skill-writer/references/description-optimization.md` | canonical | high | Trigger quality constraints | Process guidance |
| `.agents/skills/skill-writer/references/evaluation-path.md` | canonical | high | Lightweight evaluation rubric | Process guidance |
| `<pi-mono>/packages/agent/README.md` | canonical | high | Public API intent, event flow, message pipeline semantics | External repo snapshot at retrieval date |
| `<pi-mono>/packages/agent/src/types.ts` | canonical | high | Type-level contracts for `AgentLoopConfig`, events, and tools | Source of truth for interfaces |
| `<pi-mono>/packages/agent/src/agent.ts` | canonical | high | Runtime semantics for `prompt`, `continue`, queueing, state transitions | Source of truth for behavior |
| `<pi-mono>/packages/agent/src/agent-loop.ts` | canonical | high | Loop/event ordering and transform/convert call boundary | Source of truth for loop behavior |
| `<pi-mono>/packages/agent/src/proxy.ts` | canonical | medium | Proxy streaming model and error path behavior | Focused on proxy mode only |
| `<pi-mono>/packages/agent/CHANGELOG.md` | secondary | medium | Migration/renaming guidance and breaking changes | Historical summaries, validate against source |
| `<pi-mono>/packages/agent/test/agent.test.ts` | canonical | high | Concurrency/queue/continue edge-case behavior | Test-backed behavioral assertions |
| `<pi-mono>/packages/agent/test/agent-loop.test.ts` | canonical | high | transform/convert ordering, event semantics | Test-backed behavioral assertions |
| `specs/harness-agent-spec.md` | canonical | high | Consumer-side integration contract in junior runtime | Repo-local runtime spec |
| `packages/junior/src/chat/respond.ts` | canonical | high | Real-world Pi streaming bridge and timeout handling pattern | Consumer implementation snapshot |
| `packages/junior/src/chat/runtime/streaming.ts` | canonical | high | `AsyncIterable<string>` bridge behavior | Consumer implementation snapshot |

## Decisions

| Decision | Status | Evidence |
| --- | --- | --- |
| Classify skill as `integration-documentation` | adopted | `mode-selection.md` + user goal ("using Pi in another library") |
| Keep skill focused on consumer integration (not authoring internals) | adopted | User request + `harness-agent-spec.md` + `respond.ts` |
| Make event-stream bridge (`message_update`/`text_delta`) a primary guardrail | adopted | `AGENTS.md`, `README.md`, `respond.ts` |
| Require explicit queue/concurrency guidance (`steer`/`followUp`, `continue`) | adopted | `agent.ts`, tests, changelog |
| Include migration checks for renamed APIs | adopted | `CHANGELOG.md`, `agent.ts`, `types.ts` |
| Add proxy transport guidance as optional path | adopted | `proxy.ts` + constructor `streamFn` option |
| Add provider-specific model recommendations | rejected | Out of scope for abstraction-level integration skill |

## Coverage matrix

| Dimension | Coverage status | Evidence |
| --- | --- | --- |
| API surface and behavior contracts | complete | `types.ts`, `agent.ts`, `agent-loop.ts`, `README.md` |
| Config/runtime options | complete | `agent.ts` options + `README.md` options sections |
| Common downstream use cases | complete | `respond.ts`, `streaming.ts`, `harness-agent-spec.md`, tests |
| Known issues/failure modes with workarounds | complete | `agent.test.ts`, `agent-loop.test.ts`, changelog fixes |
| Version/migration variance | complete | `CHANGELOG.md` breaking/renamed APIs |

## Open gaps

- Add integration examples for browser-only consumers that use `streamProxy` with non-fetch runtimes.
- Expand troubleshooting with provider-specific retry/backoff examples after confirming stable patterns in upstream docs.

## Stopping rationale

Additional retrieval is currently low-yield because:

1. API contracts are already covered by source code and tests in `packages/agent`.
2. Consumer integration patterns are already represented by concrete junior runtime code (`respond.ts`, streaming bridge).
3. Remaining gaps are variant-specific extensions, not blockers for the core integration skill.
53 changes: 53 additions & 0 deletions .agents/skills/pi-agent-integration/references/api-surface.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# API Surface

Primary package: `@mariozechner/pi-agent-core`

## Core exports

- `Agent` class (`src/agent.ts`)
- `agentLoop`, `agentLoopContinue` (`src/agent-loop.ts`)
- `streamProxy` (`src/proxy.ts`)
- Types from `src/types.ts`: `AgentMessage`, `AgentTool`, `AgentEvent`, `AgentState`, `AgentLoopConfig`, `StreamFn`

## `Agent` constructor options

- `initialState` (`systemPrompt`, `model`, `thinkingLevel`, `tools`, `messages`)
- `convertToLlm(messages)` for message conversion/filtering
- `transformContext(messages, signal)` for pruning/injection before conversion
- `steeringMode`, `followUpMode` (`"one-at-a-time"` or `"all"`)
- `streamFn` for custom/proxied streaming
- `sessionId`, `getApiKey`, `thinkingBudgets`, `transport`, `maxRetryDelayMs`

## Core runtime methods

- Prompting: `prompt(string | AgentMessage | AgentMessage[])`, `continue()`
- Queueing: `steer(message)`, `followUp(message)`, plus clear/dequeue helpers
- State mutation: `setSystemPrompt`, `setModel`, `setThinkingLevel`, `setTools`, `replaceMessages`, `appendMessage`, `clearMessages`, `reset`
- Lifecycle: `abort()`, `waitForIdle()`, `subscribe(listener)`

## Event contract

- Lifecycle events: `agent_start`, `turn_start`, `turn_end`, `agent_end`
- Message events: `message_start`, `message_update`, `message_end`
- Tool events: `tool_execution_start`, `tool_execution_update`, `tool_execution_end`
- Streaming text should be read from `message_update` where `assistantMessageEvent.type === "text_delta"`

## Message pipeline contract

`AgentMessage[]` -> `transformContext()` -> `convertToLlm()` -> LLM `Message[]`

- `transformContext`: keep message-level behavior (pruning, external context injection)
- `convertToLlm`: convert/filter to provider-compatible `user`/`assistant`/`toolResult` messages

## Continue/queue semantics

- `prompt()` and `continue()` throw if `isStreaming` is true.
- `continue()` requires message history and valid tail state.
- If tail is `assistant`, `continue()` can resume queued `steer`/`followUp`; otherwise it throws.
- Mid-run user input should be queued with `steer` or `followUp`, not re-entered with `prompt`.

## Version and migration points to check

- Queue API migration: `queueMessage` replaced by `steer`/`followUp`
- Option migration: `messageTransformer` -> `convertToLlm`, `preprocessor` -> `transformContext`
- Transport abstraction changes: prefer `streamFn` customization for proxy/server routing
33 changes: 33 additions & 0 deletions .agents/skills/pi-agent-integration/references/common-use-cases.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Common Use Cases

Use these patterns when Pi `Agent` is consumed by another library/runtime.

1. Stream assistant text into another SDK surface:
Use `agent.subscribe` and forward only `message_update` + `text_delta` into an `AsyncIterable<string>` bridge.

2. Preserve streamed-vs-final output parity:
Insert separators between assistant message boundaries during delta streaming so final joined text matches non-streamed output semantics.

3. Add custom app messages without leaking them to LLM calls:
Keep custom message types in agent state; filter/convert them in `convertToLlm`.

4. Prune or augment context safely:
Use `transformContext` for context window control and deterministic context injection before `convertToLlm`.

5. Support user steering while tools are running:
Use `steer` for interruptions and `followUp` for deferred prompts instead of issuing parallel `prompt()` calls.

6. Implement timeout-controlled turns:
Race prompt execution against a timeout, call `agent.abort()` on timeout, and surface explicit timeout diagnostics.

7. Resume across session slices/checkpoints:
Restore message history (for example via `replaceMessages`) and call `continue()` with valid tail-state semantics.

8. Route through backend-proxied model access:
Provide a custom `streamFn` (or `streamProxy`) so auth/provider calls stay server-side while preserving local `Agent` event semantics.

9. Handle expiring provider tokens:
Use `getApiKey` dynamic resolution for each LLM call instead of static long-lived API keys.

10. Tune transport/retry constraints:
Set `transport` and `maxRetryDelayMs` intentionally for consumer runtime behavior and bounded latency.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Troubleshooting and Workarounds

Use this table when Pi-agent integration behavior is wrong in a consumer library.

| Symptom | Likely cause | Fix |
| --- | --- | --- |
| `prompt()` throws "Agent is already processing a prompt..." | Concurrent prompt while `isStreaming` is true | Queue input with `steer`/`followUp` or await existing run completion |
| `continue()` throws while agent is active | `continue()` called during streaming | Wait for idle, then call `continue()` |
| `continue()` throws "No messages to continue from" | Empty message history | Seed context with user/toolResult history before `continue()` |
| `continue()` throws from assistant-tail state | No queued steering/follow-up messages when tail is assistant | Queue `steer`/`followUp` first, or call `prompt()` with new user message |
| Stream shows no text even though turn finishes | Listener filtering wrong event type | Consume `message_update` events with `assistantMessageEvent.type === "text_delta"` |
| Streamed text and final text differ in formatting | Missing boundaries between assistant message segments | Insert explicit separators between message boundaries and normalize downstream |
| Tool call artifacts leak into user-visible output | Consumer is rendering tool calls/tool results directly | Keep tool lifecycle artifacts internal and render only resolved assistant text |
| Custom message roles break provider calls | `convertToLlm` passes non-LLM-compatible roles | Filter/transform to provider-compatible message roles in `convertToLlm` |
| Context pruning removes critical state unexpectedly | `transformContext` is non-deterministic or too aggressive | Make pruning deterministic and test with before/after context assertions |
| Queue order surprises in multi-message steering | Queue mode defaults not explicit | Set `steeringMode`/`followUpMode` intentionally (`one-at-a-time` vs `all`) |
| Timeouts do not cleanly stop UI stream | Timeout path does not call `abort()` and close stream bridge | Abort agent on timeout, always end iterable in `finally` |
| Proxy streaming errors are opaque | Proxy response/event parsing not surfaced | Validate proxy response status/body and emit explicit error diagnostics |

## Issue/fix checklist

1. Concurrent prompt failure:
Use `steer`/`followUp` during active runs; do not call `prompt` again until idle.

2. Continue during stream:
Gate `continue()` with `isStreaming`/`waitForIdle()` checks.

3. Empty continue context:
Load prior `AgentMessage[]` before `continue()` calls.

4. Assistant-tail continue rejection:
Queue a steering or follow-up message first, or start a fresh prompt.

5. Missing text deltas:
Filter to `message_update` + `text_delta`; ignore other delta types for user text stream.

6. Stream/final mismatch:
Insert message-boundary separators and apply identical normalization in streamed and final output paths.

7. Invalid custom roles at provider boundary:
Map custom messages in `convertToLlm`; keep only provider-compatible roles.

8. Over-pruned context:
Make `transformContext` deterministic and verify retained messages in tests.

9. Queue-order surprises:
Set `steeringMode` and `followUpMode` explicitly in wrappers.

10. Timeout leak:
Always abort and close iterable in `finally` blocks.
18 changes: 18 additions & 0 deletions packages/junior/src/chat/queue/client.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { handleCallback, send } from "@vercel/queue";

const DEFAULT_TOPIC_NAME = "junior-thread-message";
const SUBAGENT_TASK_TOPIC_NAME = "junior-subagent-task";

const MAX_DELIVERY_ATTEMPTS = 10;

export function getThreadMessageTopic(): string {
return DEFAULT_TOPIC_NAME;
}

export function getSubagentTaskTopic(): string {
return SUBAGENT_TASK_TOPIC_NAME;
}

export async function enqueueThreadMessage(
payload: unknown,
options?: {
Expand All @@ -21,6 +26,19 @@ export async function enqueueThreadMessage(
return result.messageId ?? undefined;
}

export async function enqueueSubagentTask(
payload: unknown,
options?: {
idempotencyKey?: string;
}
): Promise<string | undefined> {
const result = await send(getSubagentTaskTopic(), payload, {
...(options?.idempotencyKey ? { idempotencyKey: options.idempotencyKey } : {})
});

return result.messageId ?? undefined;
}

export function createQueueCallbackHandler<T>(
handler: (message: T, metadata: { messageId: string; deliveryCount: number; topicName: string }) => Promise<void>
): (request: Request) => Promise<Response> {
Expand Down
14 changes: 14 additions & 0 deletions packages/junior/src/chat/queue/process-thread-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "@/chat/state";
import { processThreadMessageRuntime } from "@/chat/thread-runtime/process-thread-message-runtime";
import type { ThreadMessagePayload } from "@/chat/queue/types";
import { isRetryableTurnError } from "@/chat/turn/errors";

let stateAdapterConnected = false;

Expand Down Expand Up @@ -186,6 +187,19 @@ export async function processQueuedThreadMessage(
throw new QueueMessageOwnershipError("complete", payload.dedupKey);
}
} catch (error) {
if (isRetryableTurnError(error, "subagent_task_deferred")) {
const completed = await completeQueueMessageProcessingOwnership({
rawKey: payload.dedupKey,
ownerToken,
queueMessageId: payload.queueMessageId
});

if (!completed) {
throw new QueueMessageOwnershipError("complete", payload.dedupKey);
}
return;
}

const errorMessage = error instanceof Error ? error.message : String(error);

await logThreadMessageFailure(payload, errorMessage);
Expand Down
17 changes: 17 additions & 0 deletions packages/junior/src/chat/queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import type { Message, SerializedMessage, SerializedThread, Thread } from "chat"

export type ThreadMessageKind = "new_mention" | "subscribed_message" | "subscribed_reply";

export interface QueueResumeContext {
dedupKey: string;
message: Message | SerializedMessage;
normalizedThreadId: string;
thread: Thread | SerializedThread;
}

export interface ThreadMessagePayload {
dedupKey: string;
kind: ThreadMessageKind;
Expand All @@ -10,3 +17,13 @@ export interface ThreadMessagePayload {
thread: Thread | SerializedThread;
queueMessageId?: string;
}

export interface SubagentTaskPayload {
callKey: string;
conversationId: string;
sessionId: string;
task: string;
queueContext: QueueResumeContext;
}

export type QueuePayload = ThreadMessagePayload | SubagentTaskPayload;
Loading