From d253d31cbe61fc73f575a2adb501704b1e6d108f Mon Sep 17 00:00:00 2001 From: David Cramer Date: Thu, 5 Mar 2026 14:42:55 -0800 Subject: [PATCH 1/2] feat(skills): Add Pi agent integration skill Synthesize a new integration-documentation skill for consumers that use @mariozechner/pi-agent-core as their agent abstraction. Document API contracts, common downstream integration patterns, and failure-mode troubleshooting focused on streaming bridges, queue/continue semantics, and migration-safe usage. Co-Authored-By: Codex --- .agents/skills/pi-agent-integration/SKILL.md | 53 ++++++++++++++++ .../skills/pi-agent-integration/SOURCES.md | 63 +++++++++++++++++++ .../references/api-surface.md | 53 ++++++++++++++++ .../references/common-use-cases.md | 33 ++++++++++ .../references/troubleshooting-workarounds.md | 50 +++++++++++++++ 5 files changed, 252 insertions(+) create mode 100644 .agents/skills/pi-agent-integration/SKILL.md create mode 100644 .agents/skills/pi-agent-integration/SOURCES.md create mode 100644 .agents/skills/pi-agent-integration/references/api-surface.md create mode 100644 .agents/skills/pi-agent-integration/references/common-use-cases.md create mode 100644 .agents/skills/pi-agent-integration/references/troubleshooting-workarounds.md diff --git a/.agents/skills/pi-agent-integration/SKILL.md b/.agents/skills/pi-agent-integration/SKILL.md new file mode 100644 index 00000000..3241ef8c --- /dev/null +++ b/.agents/skills/pi-agent-integration/SKILL.md @@ -0,0 +1,53 @@ +--- +name: pi-agent-integration +description: Integrate `@mariozechner/pi-agent-core` as the agent abstraction inside another library or runtime. Use when implementing or refactoring Pi Agent wrappers, streaming bridges, `convertToLlm`/`transformContext`, queueing via `steer`/`followUp`, `continue()` semantics, or timeout/abort/session behavior. +--- + +Implement Pi-agent consumers with stable streaming, correct queue semantics, and minimal wrapper surface area. + +## Step 1: Classify the request + +Pick the path before editing: + +| Request type | Read first | +| --- | --- | +| Wiring or updating agent wrapper APIs/options | `references/api-surface.md` | +| Adding behavior in a consumer library (chat, orchestration, tools) | `references/common-use-cases.md` | +| Debugging broken streaming/tool/continue behavior | `references/troubleshooting-workarounds.md` | + +If the task spans multiple categories, load only the relevant files above. + +## Step 2: Apply integration guardrails + +1. Treat `Agent` as the execution engine and keep wrapper abstractions thin. +2. Stream user-visible text only from `message_update` + `assistantMessageEvent.type === "text_delta"`. +3. Bridge deltas into `AsyncIterable` and pass that iterable to downstream streaming surfaces. +4. Preserve message boundaries when streaming multi-message assistant output (insert separators intentionally, then normalize). +5. Never call `prompt()` or `continue()` while the agent is running; use `steer()`/`followUp()` for mid-run input. +6. Keep `convertToLlm` and `transformContext` explicit, deterministic, and easy to test. +7. Keep tool calls/results as internal execution artifacts unless product UX explicitly requires otherwise. + +## Step 3: Implement with minimal surface + +1. Prefer constructor options over custom wrapper state machines (`streamFn`, `getApiKey`, `sessionId`, `thinkingBudgets`, `maxRetryDelayMs`). +2. Use `transformContext` for pruning/injection and `convertToLlm` for message-role conversion/filtering. +3. Keep queue mode explicit (`steeringMode`, `followUpMode`) when concurrency/order matters. +4. For server-proxied model access, use `streamFn` with `streamProxy`-style behavior instead of bespoke provider logic in consumers. +5. Keep failure behavior explicit: timeout/abort paths should set observable diagnostics and terminate streaming cleanly. + +## Step 4: Verify behavior + +1. Verify event-to-stream bridge emits only text deltas and always closes the iterable. +2. Verify `prompt()`/`continue()` race handling (throws while streaming; queue path works via `steer`/`followUp`). +3. Verify `continue()` preconditions: non-empty context and valid last-message role semantics. +4. Verify custom message types survive agent state while `convertToLlm` emits only LLM-compatible roles. +5. Verify tool execution and turn lifecycle events remain internal unless explicitly exposed. +6. Verify newline joining/normalization parity between streamed and finalized outputs. + +## Step 5: Migration and version checks + +1. Check for queue API migrations (`queueMessage` -> `steer`/`followUp`) before editing old wrappers. +2. Check renamed hooks/options (`messageTransformer` -> `convertToLlm`, `preprocessor` -> `transformContext`). +3. Check default/available options in current package version before adding compatibility shims. +4. Favor hard cutovers unless backward compatibility is explicitly requested. + diff --git a/.agents/skills/pi-agent-integration/SOURCES.md b/.agents/skills/pi-agent-integration/SOURCES.md new file mode 100644 index 00000000..8cedfa24 --- /dev/null +++ b/.agents/skills/pi-agent-integration/SOURCES.md @@ -0,0 +1,63 @@ +# Sources + +Retrieved: 2026-03-05 +Skill class: `integration-documentation` +Selected profile: `references/examples/documentation-skill.md` + +## Source inventory + +| Source | Trust tier | Confidence | Contribution | Usage constraints | +| --- | --- | --- | --- | --- | +| `AGENTS.md` (junior) | canonical | high | Repository conventions and Pi streaming standard (`message_update`/`text_delta` -> `AsyncIterable`) | Repo-local guidance | +| `.agents/skills/skill-writer/SKILL.md` | canonical | high | Required workflow for synthesis/authoring/validation outputs | Skill-authoring process source | +| `.agents/skills/skill-writer/references/mode-selection.md` | canonical | high | Class selection and required outputs | Process guidance | +| `.agents/skills/skill-writer/references/synthesis-path.md` | canonical | high | Provenance, coverage matrix, depth gates | Process guidance | +| `.agents/skills/skill-writer/references/authoring-path.md` | canonical | high | Required artifact set for integration-documentation skills | Process guidance | +| `.agents/skills/skill-writer/references/description-optimization.md` | canonical | high | Trigger quality constraints | Process guidance | +| `.agents/skills/skill-writer/references/evaluation-path.md` | canonical | high | Lightweight evaluation rubric | Process guidance | +| `/packages/agent/README.md` | canonical | high | Public API intent, event flow, message pipeline semantics | External repo snapshot at retrieval date | +| `/packages/agent/src/types.ts` | canonical | high | Type-level contracts for `AgentLoopConfig`, events, and tools | Source of truth for interfaces | +| `/packages/agent/src/agent.ts` | canonical | high | Runtime semantics for `prompt`, `continue`, queueing, state transitions | Source of truth for behavior | +| `/packages/agent/src/agent-loop.ts` | canonical | high | Loop/event ordering and transform/convert call boundary | Source of truth for loop behavior | +| `/packages/agent/src/proxy.ts` | canonical | medium | Proxy streaming model and error path behavior | Focused on proxy mode only | +| `/packages/agent/CHANGELOG.md` | secondary | medium | Migration/renaming guidance and breaking changes | Historical summaries, validate against source | +| `/packages/agent/test/agent.test.ts` | canonical | high | Concurrency/queue/continue edge-case behavior | Test-backed behavioral assertions | +| `/packages/agent/test/agent-loop.test.ts` | canonical | high | transform/convert ordering, event semantics | Test-backed behavioral assertions | +| `specs/harness-agent-spec.md` | canonical | high | Consumer-side integration contract in junior runtime | Repo-local runtime spec | +| `packages/junior/src/chat/respond.ts` | canonical | high | Real-world Pi streaming bridge and timeout handling pattern | Consumer implementation snapshot | +| `packages/junior/src/chat/runtime/streaming.ts` | canonical | high | `AsyncIterable` bridge behavior | Consumer implementation snapshot | + +## Decisions + +| Decision | Status | Evidence | +| --- | --- | --- | +| Classify skill as `integration-documentation` | adopted | `mode-selection.md` + user goal ("using Pi in another library") | +| Keep skill focused on consumer integration (not authoring internals) | adopted | User request + `harness-agent-spec.md` + `respond.ts` | +| Make event-stream bridge (`message_update`/`text_delta`) a primary guardrail | adopted | `AGENTS.md`, `README.md`, `respond.ts` | +| Require explicit queue/concurrency guidance (`steer`/`followUp`, `continue`) | adopted | `agent.ts`, tests, changelog | +| Include migration checks for renamed APIs | adopted | `CHANGELOG.md`, `agent.ts`, `types.ts` | +| Add proxy transport guidance as optional path | adopted | `proxy.ts` + constructor `streamFn` option | +| Add provider-specific model recommendations | rejected | Out of scope for abstraction-level integration skill | + +## Coverage matrix + +| Dimension | Coverage status | Evidence | +| --- | --- | --- | +| API surface and behavior contracts | complete | `types.ts`, `agent.ts`, `agent-loop.ts`, `README.md` | +| Config/runtime options | complete | `agent.ts` options + `README.md` options sections | +| Common downstream use cases | complete | `respond.ts`, `streaming.ts`, `harness-agent-spec.md`, tests | +| Known issues/failure modes with workarounds | complete | `agent.test.ts`, `agent-loop.test.ts`, changelog fixes | +| Version/migration variance | complete | `CHANGELOG.md` breaking/renamed APIs | + +## Open gaps + +- Add integration examples for browser-only consumers that use `streamProxy` with non-fetch runtimes. +- Expand troubleshooting with provider-specific retry/backoff examples after confirming stable patterns in upstream docs. + +## Stopping rationale + +Additional retrieval is currently low-yield because: + +1. API contracts are already covered by source code and tests in `packages/agent`. +2. Consumer integration patterns are already represented by concrete junior runtime code (`respond.ts`, streaming bridge). +3. Remaining gaps are variant-specific extensions, not blockers for the core integration skill. diff --git a/.agents/skills/pi-agent-integration/references/api-surface.md b/.agents/skills/pi-agent-integration/references/api-surface.md new file mode 100644 index 00000000..0c0cee29 --- /dev/null +++ b/.agents/skills/pi-agent-integration/references/api-surface.md @@ -0,0 +1,53 @@ +# API Surface + +Primary package: `@mariozechner/pi-agent-core` + +## Core exports + +- `Agent` class (`src/agent.ts`) +- `agentLoop`, `agentLoopContinue` (`src/agent-loop.ts`) +- `streamProxy` (`src/proxy.ts`) +- Types from `src/types.ts`: `AgentMessage`, `AgentTool`, `AgentEvent`, `AgentState`, `AgentLoopConfig`, `StreamFn` + +## `Agent` constructor options + +- `initialState` (`systemPrompt`, `model`, `thinkingLevel`, `tools`, `messages`) +- `convertToLlm(messages)` for message conversion/filtering +- `transformContext(messages, signal)` for pruning/injection before conversion +- `steeringMode`, `followUpMode` (`"one-at-a-time"` or `"all"`) +- `streamFn` for custom/proxied streaming +- `sessionId`, `getApiKey`, `thinkingBudgets`, `transport`, `maxRetryDelayMs` + +## Core runtime methods + +- Prompting: `prompt(string | AgentMessage | AgentMessage[])`, `continue()` +- Queueing: `steer(message)`, `followUp(message)`, plus clear/dequeue helpers +- State mutation: `setSystemPrompt`, `setModel`, `setThinkingLevel`, `setTools`, `replaceMessages`, `appendMessage`, `clearMessages`, `reset` +- Lifecycle: `abort()`, `waitForIdle()`, `subscribe(listener)` + +## Event contract + +- Lifecycle events: `agent_start`, `turn_start`, `turn_end`, `agent_end` +- Message events: `message_start`, `message_update`, `message_end` +- Tool events: `tool_execution_start`, `tool_execution_update`, `tool_execution_end` +- Streaming text should be read from `message_update` where `assistantMessageEvent.type === "text_delta"` + +## Message pipeline contract + +`AgentMessage[]` -> `transformContext()` -> `convertToLlm()` -> LLM `Message[]` + +- `transformContext`: keep message-level behavior (pruning, external context injection) +- `convertToLlm`: convert/filter to provider-compatible `user`/`assistant`/`toolResult` messages + +## Continue/queue semantics + +- `prompt()` and `continue()` throw if `isStreaming` is true. +- `continue()` requires message history and valid tail state. +- If tail is `assistant`, `continue()` can resume queued `steer`/`followUp`; otherwise it throws. +- Mid-run user input should be queued with `steer` or `followUp`, not re-entered with `prompt`. + +## Version and migration points to check + +- Queue API migration: `queueMessage` replaced by `steer`/`followUp` +- Option migration: `messageTransformer` -> `convertToLlm`, `preprocessor` -> `transformContext` +- Transport abstraction changes: prefer `streamFn` customization for proxy/server routing diff --git a/.agents/skills/pi-agent-integration/references/common-use-cases.md b/.agents/skills/pi-agent-integration/references/common-use-cases.md new file mode 100644 index 00000000..a883cf8e --- /dev/null +++ b/.agents/skills/pi-agent-integration/references/common-use-cases.md @@ -0,0 +1,33 @@ +# Common Use Cases + +Use these patterns when Pi `Agent` is consumed by another library/runtime. + +1. Stream assistant text into another SDK surface: +Use `agent.subscribe` and forward only `message_update` + `text_delta` into an `AsyncIterable` bridge. + +2. Preserve streamed-vs-final output parity: +Insert separators between assistant message boundaries during delta streaming so final joined text matches non-streamed output semantics. + +3. Add custom app messages without leaking them to LLM calls: +Keep custom message types in agent state; filter/convert them in `convertToLlm`. + +4. Prune or augment context safely: +Use `transformContext` for context window control and deterministic context injection before `convertToLlm`. + +5. Support user steering while tools are running: +Use `steer` for interruptions and `followUp` for deferred prompts instead of issuing parallel `prompt()` calls. + +6. Implement timeout-controlled turns: +Race prompt execution against a timeout, call `agent.abort()` on timeout, and surface explicit timeout diagnostics. + +7. Resume across session slices/checkpoints: +Restore message history (for example via `replaceMessages`) and call `continue()` with valid tail-state semantics. + +8. Route through backend-proxied model access: +Provide a custom `streamFn` (or `streamProxy`) so auth/provider calls stay server-side while preserving local `Agent` event semantics. + +9. Handle expiring provider tokens: +Use `getApiKey` dynamic resolution for each LLM call instead of static long-lived API keys. + +10. Tune transport/retry constraints: +Set `transport` and `maxRetryDelayMs` intentionally for consumer runtime behavior and bounded latency. diff --git a/.agents/skills/pi-agent-integration/references/troubleshooting-workarounds.md b/.agents/skills/pi-agent-integration/references/troubleshooting-workarounds.md new file mode 100644 index 00000000..aad48a69 --- /dev/null +++ b/.agents/skills/pi-agent-integration/references/troubleshooting-workarounds.md @@ -0,0 +1,50 @@ +# Troubleshooting and Workarounds + +Use this table when Pi-agent integration behavior is wrong in a consumer library. + +| Symptom | Likely cause | Fix | +| --- | --- | --- | +| `prompt()` throws "Agent is already processing a prompt..." | Concurrent prompt while `isStreaming` is true | Queue input with `steer`/`followUp` or await existing run completion | +| `continue()` throws while agent is active | `continue()` called during streaming | Wait for idle, then call `continue()` | +| `continue()` throws "No messages to continue from" | Empty message history | Seed context with user/toolResult history before `continue()` | +| `continue()` throws from assistant-tail state | No queued steering/follow-up messages when tail is assistant | Queue `steer`/`followUp` first, or call `prompt()` with new user message | +| Stream shows no text even though turn finishes | Listener filtering wrong event type | Consume `message_update` events with `assistantMessageEvent.type === "text_delta"` | +| Streamed text and final text differ in formatting | Missing boundaries between assistant message segments | Insert explicit separators between message boundaries and normalize downstream | +| Tool call artifacts leak into user-visible output | Consumer is rendering tool calls/tool results directly | Keep tool lifecycle artifacts internal and render only resolved assistant text | +| Custom message roles break provider calls | `convertToLlm` passes non-LLM-compatible roles | Filter/transform to provider-compatible message roles in `convertToLlm` | +| Context pruning removes critical state unexpectedly | `transformContext` is non-deterministic or too aggressive | Make pruning deterministic and test with before/after context assertions | +| Queue order surprises in multi-message steering | Queue mode defaults not explicit | Set `steeringMode`/`followUpMode` intentionally (`one-at-a-time` vs `all`) | +| Timeouts do not cleanly stop UI stream | Timeout path does not call `abort()` and close stream bridge | Abort agent on timeout, always end iterable in `finally` | +| Proxy streaming errors are opaque | Proxy response/event parsing not surfaced | Validate proxy response status/body and emit explicit error diagnostics | + +## Issue/fix checklist + +1. Concurrent prompt failure: +Use `steer`/`followUp` during active runs; do not call `prompt` again until idle. + +2. Continue during stream: +Gate `continue()` with `isStreaming`/`waitForIdle()` checks. + +3. Empty continue context: +Load prior `AgentMessage[]` before `continue()` calls. + +4. Assistant-tail continue rejection: +Queue a steering or follow-up message first, or start a fresh prompt. + +5. Missing text deltas: +Filter to `message_update` + `text_delta`; ignore other delta types for user text stream. + +6. Stream/final mismatch: +Insert message-boundary separators and apply identical normalization in streamed and final output paths. + +7. Invalid custom roles at provider boundary: +Map custom messages in `convertToLlm`; keep only provider-compatible roles. + +8. Over-pruned context: +Make `transformContext` deterministic and verify retained messages in tests. + +9. Queue-order surprises: +Set `steeringMode` and `followUpMode` explicitly in wrappers. + +10. Timeout leak: +Always abort and close iterable in `finally` blocks. From 6be69f45036120e2321dd2e33bbedd37293b44d8 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Thu, 5 Mar 2026 16:08:08 -0800 Subject: [PATCH 2/2] feat(junior): Add deferred subagent handoff and deslop tool context Add queued subagent delegation with child-task processing and parent turn continuation so long-running delegated work can complete safely in serverless. Also reduce interface slop by collapsing queue handoff fields into a typed queue context object, removing unused model override plumbing, and gating subagent runs from spawning subagents or using Slack tools. Fixes #71 Co-Authored-By: GPT-5 Codex --- packages/junior/src/chat/queue/client.ts | 18 +++ .../src/chat/queue/process-thread-message.ts | 14 +++ packages/junior/src/chat/queue/types.ts | 17 +++ packages/junior/src/chat/respond.ts | 69 +++++++++- .../junior/src/chat/runtime/reply-executor.ts | 28 ++++- packages/junior/src/chat/state.ts | 118 ++++++++++++++++++ .../junior/src/chat/subagent/process-task.ts | 92 ++++++++++++++ packages/junior/src/chat/tools/definition.ts | 5 + packages/junior/src/chat/tools/index.ts | 28 +++-- .../junior/src/chat/tools/task-subagent.ts | 88 +++++++++++++ packages/junior/src/chat/tools/types.ts | 1 + packages/junior/src/chat/turn/errors.ts | 2 +- .../junior/src/handlers/queue-callback.ts | 81 +++++++----- .../queue/queue-callback-route.test.ts | 37 +++++- .../junior/tests/task-subagent-tool.test.ts | 116 +++++++++++++++++ .../process-thread-message-reaction.test.ts | 16 +++ .../unit/slack/tool-registration.test.ts | 18 +++ 17 files changed, 703 insertions(+), 45 deletions(-) create mode 100644 packages/junior/src/chat/subagent/process-task.ts create mode 100644 packages/junior/src/chat/tools/task-subagent.ts create mode 100644 packages/junior/tests/task-subagent-tool.test.ts diff --git a/packages/junior/src/chat/queue/client.ts b/packages/junior/src/chat/queue/client.ts index 44b57247..34302fc1 100644 --- a/packages/junior/src/chat/queue/client.ts +++ b/packages/junior/src/chat/queue/client.ts @@ -1,6 +1,7 @@ import { handleCallback, send } from "@vercel/queue"; const DEFAULT_TOPIC_NAME = "junior-thread-message"; +const SUBAGENT_TASK_TOPIC_NAME = "junior-subagent-task"; const MAX_DELIVERY_ATTEMPTS = 10; @@ -8,6 +9,10 @@ export function getThreadMessageTopic(): string { return DEFAULT_TOPIC_NAME; } +export function getSubagentTaskTopic(): string { + return SUBAGENT_TASK_TOPIC_NAME; +} + export async function enqueueThreadMessage( payload: unknown, options?: { @@ -21,6 +26,19 @@ export async function enqueueThreadMessage( return result.messageId ?? undefined; } +export async function enqueueSubagentTask( + payload: unknown, + options?: { + idempotencyKey?: string; + } +): Promise { + const result = await send(getSubagentTaskTopic(), payload, { + ...(options?.idempotencyKey ? { idempotencyKey: options.idempotencyKey } : {}) + }); + + return result.messageId ?? undefined; +} + export function createQueueCallbackHandler( handler: (message: T, metadata: { messageId: string; deliveryCount: number; topicName: string }) => Promise ): (request: Request) => Promise { diff --git a/packages/junior/src/chat/queue/process-thread-message.ts b/packages/junior/src/chat/queue/process-thread-message.ts index 8a549adc..4ee8d4f2 100644 --- a/packages/junior/src/chat/queue/process-thread-message.ts +++ b/packages/junior/src/chat/queue/process-thread-message.ts @@ -11,6 +11,7 @@ import { } from "@/chat/state"; import { processThreadMessageRuntime } from "@/chat/thread-runtime/process-thread-message-runtime"; import type { ThreadMessagePayload } from "@/chat/queue/types"; +import { isRetryableTurnError } from "@/chat/turn/errors"; let stateAdapterConnected = false; @@ -186,6 +187,19 @@ export async function processQueuedThreadMessage( throw new QueueMessageOwnershipError("complete", payload.dedupKey); } } catch (error) { + if (isRetryableTurnError(error, "subagent_task_deferred")) { + const completed = await completeQueueMessageProcessingOwnership({ + rawKey: payload.dedupKey, + ownerToken, + queueMessageId: payload.queueMessageId + }); + + if (!completed) { + throw new QueueMessageOwnershipError("complete", payload.dedupKey); + } + return; + } + const errorMessage = error instanceof Error ? error.message : String(error); await logThreadMessageFailure(payload, errorMessage); diff --git a/packages/junior/src/chat/queue/types.ts b/packages/junior/src/chat/queue/types.ts index 391e861a..95dca989 100644 --- a/packages/junior/src/chat/queue/types.ts +++ b/packages/junior/src/chat/queue/types.ts @@ -2,6 +2,13 @@ import type { Message, SerializedMessage, SerializedThread, Thread } from "chat" export type ThreadMessageKind = "new_mention" | "subscribed_message" | "subscribed_reply"; +export interface QueueResumeContext { + dedupKey: string; + message: Message | SerializedMessage; + normalizedThreadId: string; + thread: Thread | SerializedThread; +} + export interface ThreadMessagePayload { dedupKey: string; kind: ThreadMessageKind; @@ -10,3 +17,13 @@ export interface ThreadMessagePayload { thread: Thread | SerializedThread; queueMessageId?: string; } + +export interface SubagentTaskPayload { + callKey: string; + conversationId: string; + sessionId: string; + task: string; + queueContext: QueueResumeContext; +} + +export type QueuePayload = ThreadMessagePayload | SubagentTaskPayload; diff --git a/packages/junior/src/chat/respond.ts b/packages/junior/src/chat/respond.ts index 250bf48a..5923aac8 100644 --- a/packages/junior/src/chat/respond.ts +++ b/packages/junior/src/chat/respond.ts @@ -28,6 +28,7 @@ import { SlackActionError } from "@/chat/slack-actions/client"; import type { ThreadArtifactsState } from "@/chat/slack-actions/types"; import { createTools } from "@/chat/tools"; import type { ToolDefinition } from "@/chat/tools/definition"; +import type { QueueResumeContext } from "@/chat/queue/types"; import { GEN_AI_PROVIDER_NAME, getGatewayApiKey, resolveGatewayModel } from "@/chat/pi/client"; import { createSandboxExecutor, type SandboxExecutor } from "@/chat/sandbox/sandbox"; import { getRuntimeMetadata } from "@/chat/runtime-metadata"; @@ -80,6 +81,8 @@ export interface ReplyRequestContext { }; onStatus?: (status: string) => void | Promise; onTextDelta?: (deltaText: string) => void | Promise; + queueContext?: QueueResumeContext; + isSubagentExecution?: boolean; } export interface AssistantReply { @@ -221,6 +224,7 @@ function formatToolStatus(toolName: string): string { writeFile: "Writing file in sandbox", webSearch: "Searching public sources", webFetch: "Reading source pages", + taskSubagent: "Delegating task to subagent queue", slackChannelPostMessage: "Posting message to channel", slackMessageAddReaction: "Adding emoji reaction", slackChannelListMembers: "Listing channel members", @@ -468,6 +472,11 @@ function createAgentTools( tools: Record>, sandbox: SkillSandbox, spanContext: ObservabilityContext, + turnContext: { + conversationId?: string; + sessionId?: string; + queueContext?: QueueResumeContext; + }, onStatus?: (status: string) => void | Promise, sandboxExecutor?: SandboxExecutor, capabilityRuntime?: SkillCapabilityRuntime, @@ -627,7 +636,11 @@ function createAgentTools( : parsed }) : await toolDef.execute(parsed as never, { - experimental_context: sandbox + experimental_context: sandbox, + toolCallId: normalizedToolCallId, + conversationId: turnContext.conversationId, + sessionId: turnContext.sessionId, + queueContext: turnContext.queueContext }); const resultDetails = sandboxExecutor?.canExecute(toolName) && result && typeof result === "object" && "result" in result @@ -735,6 +748,7 @@ export async function generateAssistantReply( let timeoutResumeSessionId: string | undefined; let timeoutResumeSliceId = 1; let timeoutResumeMessages: unknown[] = []; + let latestPiMessages: unknown[] = []; try { const shouldTrace = shouldEmitDevAgentTrace(); @@ -902,6 +916,7 @@ export async function generateAssistantReply( }, { channelId: context.toolChannelId ?? context.correlation?.channelId, + isSubagentExecution: context.isSubagentExecution, messageTs: context.correlation?.messageTs, threadTs: context.correlation?.threadTs, userText: userInput, @@ -977,6 +992,11 @@ export async function generateAssistantReply( tools as Record>, skillSandbox, spanContext, + { + conversationId: sessionConversationId, + sessionId, + queueContext: context.queueContext + }, context.onStatus, sandboxExecutor, capabilityRuntime, @@ -1108,6 +1128,7 @@ export async function generateAssistantReply( } ); } finally { + latestPiMessages = [...(agent.state.messages as unknown[])]; unsubscribe(); } @@ -1300,6 +1321,52 @@ export async function generateAssistantReply( ); } + if ( + isRetryableTurnError(error, "subagent_task_deferred") && + timeoutResumeConversationId && + timeoutResumeSessionId + ) { + const nextSliceId = timeoutResumeSliceId + 1; + try { + await upsertAgentTurnSessionCheckpoint({ + conversationId: timeoutResumeConversationId, + sessionId: timeoutResumeSessionId, + sliceId: nextSliceId, + state: "awaiting_resume", + piMessages: + latestPiMessages.length > 0 + ? latestPiMessages + : timeoutResumeMessages, + resumedFromSliceId: timeoutResumeSliceId, + errorMessage: error.message + }); + } catch (checkpointError) { + logException( + checkpointError, + "subagent_deferred_resume_checkpoint_failed", + { + slackThreadId: context.correlation?.threadId, + slackUserId: context.correlation?.requesterId, + slackChannelId: context.correlation?.channelId, + runId: context.correlation?.runId, + assistantUserName: context.assistant?.userName, + modelId: botConfig.modelId + }, + { + "app.ai.resume_conversation_id": timeoutResumeConversationId, + "app.ai.resume_session_id": timeoutResumeSessionId, + "app.ai.resume_from_slice_id": timeoutResumeSliceId, + "app.ai.resume_next_slice_id": nextSliceId + }, + "Failed to persist deferred subagent checkpoint" + ); + } + throw new RetryableTurnError( + "subagent_task_deferred", + `conversation=${timeoutResumeConversationId} session=${timeoutResumeSessionId} slice=${nextSliceId}` + ); + } + if (isRetryableTurnError(error)) { throw error; } diff --git a/packages/junior/src/chat/runtime/reply-executor.ts b/packages/junior/src/chat/runtime/reply-executor.ts index 79aab27d..5e7dffdc 100644 --- a/packages/junior/src/chat/runtime/reply-executor.ts +++ b/packages/junior/src/chat/runtime/reply-executor.ts @@ -16,6 +16,7 @@ import { generateThreadTitle, markConversationMessage, normalizeConversationText import { resolveUserAttachments } from "@/chat/services/vision-context"; import { isDmChannel } from "@/chat/slack-actions/client"; import { type ThreadArtifactsState } from "@/chat/slack-actions/types"; +import type { ThreadMessagePayload } from "@/chat/queue/types"; import { resolveReplyDelivery } from "@/chat/turn/execute"; import { isRetryableTurnError } from "@/chat/turn/errors"; import { markTurnCompleted, markTurnFailed } from "@/chat/turn/persist"; @@ -26,6 +27,22 @@ function buildDeterministicTurnId(messageId: string): string { return `turn_${sanitized}`; } +function buildQueueResumePayload(thread: Thread, message: Message, threadId: string): ThreadMessagePayload { + const threadLike = thread as unknown as { toJSON?: () => ThreadMessagePayload["thread"] }; + const messageLike = message as unknown as { toJSON?: () => ThreadMessagePayload["message"] }; + return { + dedupKey: `${threadId}:${message.id}`, + kind: "subscribed_reply", + normalizedThreadId: threadId, + thread: typeof threadLike.toJSON === "function" + ? threadLike.toJSON() + : (thread as unknown as ThreadMessagePayload["thread"]), + message: typeof messageLike.toJSON === "function" + ? messageLike.toJSON() + : (message as unknown as ThreadMessagePayload["message"]) + }; +} + interface ReplyExecutorDeps { getSlackAdapter: () => SlackAdapter; prepareTurnState: (args: { @@ -61,7 +78,8 @@ export function createReplyToThread(deps: ReplyExecutorDeps) { const threadTs = getThreadTs(threadId); const messageTs = getMessageTs(message); const runId = getRunId(thread, message); - const conversationId = threadId ?? runId; + const conversationId = threadId ?? runId; + const queueResumePayload = threadId ? buildQueueResumePayload(thread, message, threadId) : undefined; await withSpan( "chat.reply", @@ -207,6 +225,14 @@ export function createReplyToThread(deps: ReplyExecutorDeps) { channelId, requesterId: message.author.userId }, + queueContext: queueResumePayload + ? { + dedupKey: queueResumePayload.dedupKey, + normalizedThreadId: queueResumePayload.normalizedThreadId, + message: queueResumePayload.message, + thread: queueResumePayload.thread + } + : undefined, toolChannelId, sandbox: { sandboxId: preparedState.sandboxId diff --git a/packages/junior/src/chat/state.ts b/packages/junior/src/chat/state.ts index ad7d7007..cff7c5d0 100644 --- a/packages/junior/src/chat/state.ts +++ b/packages/junior/src/chat/state.ts @@ -1,16 +1,19 @@ import { createRedisState } from "@chat-adapter/state-redis"; import type { RedisStateAdapter } from "@chat-adapter/state-redis"; import type { Lock, StateAdapter } from "chat"; +import type { QueueResumeContext } from "@/chat/queue/types"; import { hasRedisConfig } from "@/chat/config"; const MIN_LOCK_TTL_MS = 1000 * 60 * 5; const QUEUE_INGRESS_DEDUP_PREFIX = "junior:queue_ingress"; const QUEUE_MESSAGE_PROCESSING_PREFIX = "junior:queue_message"; const AGENT_TURN_SESSION_PREFIX = "junior:agent_turn_session"; +const SUBAGENT_TASK_PREFIX = "junior:subagent_task"; const QUEUE_MESSAGE_PROCESSING_TTL_MS = 30 * 60 * 1000; const QUEUE_MESSAGE_COMPLETED_TTL_MS = 7 * 24 * 60 * 60 * 1000; const QUEUE_MESSAGE_FAILED_TTL_MS = 6 * 60 * 60 * 1000; const AGENT_TURN_SESSION_TTL_MS = 24 * 60 * 60 * 1000; +const SUBAGENT_TASK_TTL_MS = 24 * 60 * 60 * 1000; const CLAIM_OR_RECLAIM_PROCESSING_SCRIPT = ` local key = KEYS[1] local nowMs = tonumber(ARGV[1]) @@ -129,6 +132,7 @@ function getRedisStateAdapter(): RedisStateAdapter { export type QueueMessageProcessingStatus = "processing" | "completed" | "failed"; export type AgentTurnSessionStatus = "running" | "awaiting_resume" | "completed" | "failed"; +export type SubagentTaskStatus = "queued" | "running" | "completed" | "failed"; export interface QueueMessageProcessingState { status: QueueMessageProcessingStatus; @@ -150,6 +154,21 @@ export interface AgentTurnSessionCheckpoint { updatedAtMs: number; } +export interface SubagentTaskRecord { + callKey: string; + conversationId: string; + dedupKey: string; + message: QueueResumeContext["message"]; + normalizedThreadId: string; + resultText?: string; + sessionId: string; + status: SubagentTaskStatus; + task: string; + thread: QueueResumeContext["thread"]; + updatedAtMs: number; + errorMessage?: string; +} + function queueMessageKey(rawKey: string): string { return `${QUEUE_MESSAGE_PROCESSING_PREFIX}:${rawKey}`; } @@ -184,6 +203,10 @@ function agentTurnSessionKey(conversationId: string, sessionId: string): string return `${AGENT_TURN_SESSION_PREFIX}:${conversationId}:${sessionId}`; } +function subagentTaskKey(callKey: string): string { + return `${SUBAGENT_TASK_PREFIX}:${callKey}`; +} + function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null; } @@ -235,6 +258,60 @@ function parseAgentTurnSessionCheckpoint(value: unknown): AgentTurnSessionCheckp } } +function parseSubagentTaskRecord(value: unknown): SubagentTaskRecord | undefined { + if (typeof value !== "string") { + return undefined; + } + + try { + const parsed = JSON.parse(value) as Record; + if (!isRecord(parsed)) { + return undefined; + } + + const status = parsed.status; + if (status !== "queued" && status !== "running" && status !== "completed" && status !== "failed") { + return undefined; + } + + const callKey = parsed.callKey; + const task = parsed.task; + const conversationId = parsed.conversationId; + const sessionId = parsed.sessionId; + const dedupKey = parsed.dedupKey; + const normalizedThreadId = parsed.normalizedThreadId; + const updatedAtMs = parsed.updatedAtMs; + if ( + typeof callKey !== "string" || + typeof task !== "string" || + typeof conversationId !== "string" || + typeof sessionId !== "string" || + typeof dedupKey !== "string" || + typeof normalizedThreadId !== "string" || + typeof updatedAtMs !== "number" + ) { + return undefined; + } + + return { + callKey, + task, + conversationId, + sessionId, + dedupKey, + normalizedThreadId, + status, + updatedAtMs, + message: parsed.message as QueueResumeContext["message"], + thread: parsed.thread as QueueResumeContext["thread"], + ...(typeof parsed.resultText === "string" ? { resultText: parsed.resultText } : {}), + ...(typeof parsed.errorMessage === "string" ? { errorMessage: parsed.errorMessage } : {}) + }; + } catch { + return undefined; + } +} + export function getStateAdapter(): StateAdapter { if (!_stateAdapter) { _stateAdapter = createStateAdapter(); @@ -395,3 +472,44 @@ export async function upsertAgentTurnSessionCheckpoint(args: { await getStateAdapter().set(agentTurnSessionKey(args.conversationId, args.sessionId), JSON.stringify(checkpoint), ttlMs); return checkpoint; } + +export async function getSubagentTaskRecord(callKey: string): Promise { + await getStateAdapter().connect(); + const value = await getStateAdapter().get(subagentTaskKey(callKey)); + return parseSubagentTaskRecord(value); +} + +export async function upsertSubagentTaskRecord(args: { + callKey: string; + conversationId: string; + dedupKey: string; + message: QueueResumeContext["message"]; + normalizedThreadId: string; + resultText?: string; + sessionId: string; + status: SubagentTaskStatus; + task: string; + thread: QueueResumeContext["thread"]; + errorMessage?: string; + ttlMs?: number; +}): Promise { + await getStateAdapter().connect(); + const record: SubagentTaskRecord = { + callKey: args.callKey, + conversationId: args.conversationId, + dedupKey: args.dedupKey, + message: args.message, + normalizedThreadId: args.normalizedThreadId, + sessionId: args.sessionId, + status: args.status, + task: args.task, + thread: args.thread, + updatedAtMs: Date.now(), + ...(args.resultText ? { resultText: args.resultText } : {}), + ...(args.errorMessage ? { errorMessage: args.errorMessage } : {}) + }; + + const ttlMs = Math.max(1, args.ttlMs ?? SUBAGENT_TASK_TTL_MS); + await getStateAdapter().set(subagentTaskKey(args.callKey), JSON.stringify(record), ttlMs); + return record; +} diff --git a/packages/junior/src/chat/subagent/process-task.ts b/packages/junior/src/chat/subagent/process-task.ts new file mode 100644 index 00000000..a13fa0bf --- /dev/null +++ b/packages/junior/src/chat/subagent/process-task.ts @@ -0,0 +1,92 @@ +import { enqueueThreadMessage } from "@/chat/queue/client"; +import type { SubagentTaskPayload, ThreadMessagePayload } from "@/chat/queue/types"; +import { generateAssistantReply } from "@/chat/respond"; +import { getSubagentTaskRecord, upsertSubagentTaskRecord } from "@/chat/state"; + +function buildResumeDedupKey(payload: SubagentTaskPayload): string { + return `${payload.queueContext.dedupKey}:subagent:${payload.callKey}`; +} + +export async function processSubagentTask(payload: SubagentTaskPayload): Promise { + const existing = await getSubagentTaskRecord(payload.callKey); + if (existing?.status === "completed" || existing?.status === "failed") { + return; + } + + await upsertSubagentTaskRecord({ + callKey: payload.callKey, + conversationId: payload.conversationId, + sessionId: payload.sessionId, + dedupKey: payload.queueContext.dedupKey, + normalizedThreadId: payload.queueContext.normalizedThreadId, + task: payload.task, + status: "running", + message: payload.queueContext.message, + thread: payload.queueContext.thread + }); + + try { + const reply = await generateAssistantReply(payload.task, { + requester: { + userId: payload.queueContext.message.author?.userId, + userName: payload.queueContext.message.author?.userName, + fullName: payload.queueContext.message.author?.fullName + }, + correlation: { + conversationId: payload.conversationId, + threadId: payload.queueContext.normalizedThreadId, + turnId: payload.sessionId, + channelId: payload.queueContext.thread.channelId, + messageTs: payload.queueContext.message.id, + threadTs: payload.queueContext.message.threadId, + requesterId: payload.queueContext.message.author?.userId + }, + toolChannelId: payload.queueContext.thread.channelId, + queueContext: payload.queueContext, + isSubagentExecution: true + }); + + await upsertSubagentTaskRecord({ + callKey: payload.callKey, + conversationId: payload.conversationId, + sessionId: payload.sessionId, + dedupKey: payload.queueContext.dedupKey, + normalizedThreadId: payload.queueContext.normalizedThreadId, + task: payload.task, + status: "completed", + resultText: reply.text, + message: payload.queueContext.message, + thread: payload.queueContext.thread + }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + await upsertSubagentTaskRecord({ + callKey: payload.callKey, + conversationId: payload.conversationId, + sessionId: payload.sessionId, + dedupKey: payload.queueContext.dedupKey, + normalizedThreadId: payload.queueContext.normalizedThreadId, + task: payload.task, + status: "failed", + errorMessage, + message: payload.queueContext.message, + thread: payload.queueContext.thread + }); + } + + const stored = await getSubagentTaskRecord(payload.callKey); + if (!stored) { + return; + } + + const resumePayload: ThreadMessagePayload = { + dedupKey: buildResumeDedupKey(payload), + kind: "subscribed_reply", + normalizedThreadId: stored.normalizedThreadId, + message: stored.message, + thread: stored.thread + }; + await enqueueThreadMessage(resumePayload, { + idempotencyKey: `subagent-resume:${payload.callKey}` + }); +} diff --git a/packages/junior/src/chat/tools/definition.ts b/packages/junior/src/chat/tools/definition.ts index 7d23135e..124635c2 100644 --- a/packages/junior/src/chat/tools/definition.ts +++ b/packages/junior/src/chat/tools/definition.ts @@ -1,7 +1,12 @@ import type { Static, TSchema } from "@sinclair/typebox"; +import type { QueueResumeContext } from "@/chat/queue/types"; export interface ToolCallOptions { experimental_context?: unknown; + toolCallId?: string; + conversationId?: string; + sessionId?: string; + queueContext?: QueueResumeContext; } export interface ToolDefinition { diff --git a/packages/junior/src/chat/tools/index.ts b/packages/junior/src/chat/tools/index.ts index 86bb5c22..8740b35c 100644 --- a/packages/junior/src/chat/tools/index.ts +++ b/packages/junior/src/chat/tools/index.ts @@ -14,6 +14,7 @@ import { createSlackListCreateTool } from "@/chat/tools/slack-list-create"; import { createSlackListGetItemsTool } from "@/chat/tools/slack-list-get-items"; import { createSlackListUpdateItemTool } from "@/chat/tools/slack-list-update-item"; import { createSystemTimeTool } from "@/chat/tools/system-time"; +import { createTaskSubagentTool } from "@/chat/tools/task-subagent"; import type { ToolHooks, ToolRuntimeContext, ToolState } from "@/chat/tools/types"; import { createWebFetchTool } from "@/chat/tools/web-fetch"; import { createWebSearchTool } from "@/chat/tools/web-search"; @@ -95,6 +96,7 @@ export function createTools( hooks: ToolHooks = {}, context: ToolRuntimeContext ) { + const isSubagentExecution = context.isSubagentExecution === true; const state = createToolState(hooks, context); const tools: Record = { loadSkill: wrapToolExecution( @@ -110,19 +112,23 @@ export function createTools( writeFile: wrapToolExecution("writeFile", createWriteFileTool(), hooks), webSearch: wrapToolExecution("webSearch", createWebSearchTool(), hooks), webFetch: wrapToolExecution("webFetch", createWebFetchTool(hooks), hooks), - imageGenerate: wrapToolExecution("imageGenerate", createImageGenerateTool(hooks), hooks), - slackCanvasUpdate: wrapToolExecution("slackCanvasUpdate", createSlackCanvasUpdateTool(state, context), hooks), - slackListCreate: wrapToolExecution("slackListCreate", createSlackListCreateTool(state), hooks), - slackListAddItems: wrapToolExecution("slackListAddItems", createSlackListAddItemsTool(state), hooks), - slackListGetItems: wrapToolExecution("slackListGetItems", createSlackListGetItemsTool(state), hooks), - slackListUpdateItem: wrapToolExecution( + imageGenerate: wrapToolExecution("imageGenerate", createImageGenerateTool(hooks), hooks) + }; + + if (!isSubagentExecution) { + tools.taskSubagent = wrapToolExecution("taskSubagent", createTaskSubagentTool(), hooks); + tools.slackCanvasUpdate = wrapToolExecution("slackCanvasUpdate", createSlackCanvasUpdateTool(state, context), hooks); + tools.slackListCreate = wrapToolExecution("slackListCreate", createSlackListCreateTool(state), hooks); + tools.slackListAddItems = wrapToolExecution("slackListAddItems", createSlackListAddItemsTool(state), hooks); + tools.slackListGetItems = wrapToolExecution("slackListGetItems", createSlackListGetItemsTool(state), hooks); + tools.slackListUpdateItem = wrapToolExecution( "slackListUpdateItem", createSlackListUpdateItemTool(state), hooks - ) - }; + ); + } - if (isConversationScopedChannel(context.channelId)) { + if (!isSubagentExecution && isConversationScopedChannel(context.channelId)) { tools.slackCanvasCreate = wrapToolExecution( "slackCanvasCreate", createSlackCanvasCreateTool(context, state), @@ -130,7 +136,7 @@ export function createTools( ); } - if (isConversationChannel(context.channelId)) { + if (!isSubagentExecution && isConversationChannel(context.channelId)) { tools.slackChannelPostMessage = wrapToolExecution( "slackChannelPostMessage", createSlackChannelPostMessageTool(context, state), @@ -148,7 +154,7 @@ export function createTools( ); } - if (isConversationScopedChannel(context.channelId)) { + if (!isSubagentExecution && isConversationScopedChannel(context.channelId)) { tools.slackMessageAddReaction = wrapToolExecution( "slackMessageAddReaction", createSlackMessageAddReactionTool(context, state), diff --git a/packages/junior/src/chat/tools/task-subagent.ts b/packages/junior/src/chat/tools/task-subagent.ts new file mode 100644 index 00000000..e1211dcb --- /dev/null +++ b/packages/junior/src/chat/tools/task-subagent.ts @@ -0,0 +1,88 @@ +import { Type } from "@sinclair/typebox"; +import { enqueueSubagentTask } from "@/chat/queue/client"; +import type { QueueResumeContext, SubagentTaskPayload } from "@/chat/queue/types"; +import { getSubagentTaskRecord, upsertSubagentTaskRecord } from "@/chat/state"; +import { tool } from "@/chat/tools/definition"; +import { RetryableTurnError } from "@/chat/turn/errors"; + +function buildSubagentCallKey(input: { + conversationId?: string; + sessionId?: string; + toolCallId?: string; +}): string | undefined { + if (!input.conversationId || !input.sessionId || !input.toolCallId) { + return undefined; + } + return `${input.conversationId}:${input.sessionId}:${input.toolCallId}`; +} + +function requireQueueContext(value: QueueResumeContext | undefined): QueueResumeContext { + if (!value) { + throw new Error("taskSubagent requires queue message context"); + } + return value; +} + +export function createTaskSubagentTool() { + return tool({ + description: + "Delegate a scoped task to a background subagent worker. Use this for longer recon/planning/review tasks that may outlive one serverless turn.", + inputSchema: Type.Object({ + task: Type.String({ + minLength: 1, + maxLength: 8_000, + description: "Delegated task instructions." + }) + }), + execute: async ({ task }, options) => { + const callKey = buildSubagentCallKey({ + conversationId: options.conversationId, + sessionId: options.sessionId, + toolCallId: options.toolCallId + }); + if (!callKey) { + throw new Error("taskSubagent requires conversation/session/tool call identifiers"); + } + const queueContext = requireQueueContext(options.queueContext); + + const existing = await getSubagentTaskRecord(callKey); + if (existing?.status === "completed") { + return { + ok: true, + call_key: callKey, + status: existing.status, + output: existing.resultText ?? "" + }; + } + if (existing?.status === "failed") { + throw new Error(existing.errorMessage ?? "Delegated subagent task failed"); + } + + if (!existing) { + const payload: SubagentTaskPayload = { + callKey, + conversationId: options.conversationId!, + sessionId: options.sessionId!, + task, + queueContext + }; + + await upsertSubagentTaskRecord({ + callKey, + conversationId: payload.conversationId, + sessionId: payload.sessionId, + dedupKey: payload.queueContext.dedupKey, + normalizedThreadId: payload.queueContext.normalizedThreadId, + task: payload.task, + status: "queued", + message: payload.queueContext.message, + thread: payload.queueContext.thread + }); + + await enqueueSubagentTask(payload, { idempotencyKey: callKey }); + } + + throw new RetryableTurnError("subagent_task_deferred", `subagent task pending call_key=${callKey}`); + } + }); +} diff --git a/packages/junior/src/chat/tools/types.ts b/packages/junior/src/chat/tools/types.ts index 4dd439fe..409fce08 100644 --- a/packages/junior/src/chat/tools/types.ts +++ b/packages/junior/src/chat/tools/types.ts @@ -13,6 +13,7 @@ export interface ToolHooks { export interface ToolRuntimeContext { channelId?: string; + isSubagentExecution?: boolean; messageTs?: string; threadTs?: string; userText?: string; diff --git a/packages/junior/src/chat/turn/errors.ts b/packages/junior/src/chat/turn/errors.ts index 38f67cf0..3bfdc77d 100644 --- a/packages/junior/src/chat/turn/errors.ts +++ b/packages/junior/src/chat/turn/errors.ts @@ -1,4 +1,4 @@ -export type RetryableTurnReason = "agent_turn_timeout_resume"; +export type RetryableTurnReason = "agent_turn_timeout_resume" | "subagent_task_deferred"; export class RetryableTurnError extends Error { readonly code = "retryable_turn"; diff --git a/packages/junior/src/handlers/queue-callback.ts b/packages/junior/src/handlers/queue-callback.ts index bce1e434..3f1d81b4 100644 --- a/packages/junior/src/handlers/queue-callback.ts +++ b/packages/junior/src/handlers/queue-callback.ts @@ -1,6 +1,7 @@ -import { createQueueCallbackHandler, getThreadMessageTopic } from "@/chat/queue/client"; +import { createQueueCallbackHandler, getSubagentTaskTopic, getThreadMessageTopic } from "@/chat/queue/client"; import { processQueuedThreadMessage } from "@/chat/queue/process-thread-message"; -import type { ThreadMessagePayload } from "@/chat/queue/types"; +import { processSubagentTask } from "@/chat/subagent/process-task"; +import type { QueuePayload, SubagentTaskPayload, ThreadMessagePayload } from "@/chat/queue/types"; import { createRequestContext, logError, @@ -10,35 +11,59 @@ import { withSpan } from "@/chat/observability"; -const callbackHandler = createQueueCallbackHandler(async (message, metadata) => { - const payload = { - ...message, - queueMessageId: metadata.messageId - } satisfies ThreadMessagePayload; +const callbackHandler = createQueueCallbackHandler(async (message, metadata) => { + if (metadata.topicName === getThreadMessageTopic()) { + const payload = { + ...(message as ThreadMessagePayload), + queueMessageId: metadata.messageId + } satisfies ThreadMessagePayload; - if (metadata.topicName !== getThreadMessageTopic()) { - throw new Error(`Unexpected queue topic: ${metadata.topicName}`); + await withSpan( + "queue.process_message", + "queue.process_message", + { + slackThreadId: payload.normalizedThreadId, + slackChannelId: payload.thread.channelId, + slackUserId: payload.message.author?.userId + }, + async () => { + await processQueuedThreadMessage(payload); + }, + { + "messaging.message.id": payload.message.id, + "app.queue.message_kind": payload.kind, + "app.queue.message_id": payload.queueMessageId, + "app.queue.delivery_count": metadata.deliveryCount, + "app.queue.topic": metadata.topicName + } + ); + return; } - await withSpan( - "queue.process_message", - "queue.process_message", - { - slackThreadId: payload.normalizedThreadId, - slackChannelId: payload.thread.channelId, - slackUserId: payload.message.author?.userId - }, - async () => { - await processQueuedThreadMessage(payload); - }, - { - "messaging.message.id": payload.message.id, - "app.queue.message_kind": payload.kind, - "app.queue.message_id": payload.queueMessageId, - "app.queue.delivery_count": metadata.deliveryCount, - "app.queue.topic": metadata.topicName - } - ); + if (metadata.topicName === getSubagentTaskTopic()) { + const payload = message as SubagentTaskPayload; + await withSpan( + "queue.process_subagent_task", + "queue.process_subagent_task", + { + slackThreadId: payload.queueContext.normalizedThreadId, + slackChannelId: payload.queueContext.thread.channelId, + slackUserId: payload.queueContext.message.author?.userId + }, + async () => { + await processSubagentTask(payload); + }, + { + "app.queue.message_id": metadata.messageId, + "app.queue.delivery_count": metadata.deliveryCount, + "app.queue.topic": metadata.topicName, + "app.ai.subagent.call_key": payload.callKey + } + ); + return; + } + + throw new Error(`Unexpected queue topic: ${metadata.topicName}`); }); export async function POST(request: Request): Promise { diff --git a/packages/junior/tests/integration/queue/queue-callback-route.test.ts b/packages/junior/tests/integration/queue/queue-callback-route.test.ts index 5baf201d..68ec420c 100644 --- a/packages/junior/tests/integration/queue/queue-callback-route.test.ts +++ b/packages/junior/tests/integration/queue/queue-callback-route.test.ts @@ -1,11 +1,13 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import type { ThreadMessagePayload } from "@/chat/queue/types"; +import type { QueuePayload, ThreadMessagePayload } from "@/chat/queue/types"; const { processQueuedThreadMessageMock, + processSubagentTaskMock, callbackTopicRef } = vi.hoisted(() => ({ processQueuedThreadMessageMock: vi.fn(async () => undefined), + processSubagentTaskMock: vi.fn(async () => undefined), callbackTopicRef: { value: "junior-thread-message" } })); @@ -13,12 +15,17 @@ vi.mock("@/chat/queue/process-thread-message", () => ({ processQueuedThreadMessage: processQueuedThreadMessageMock })); +vi.mock("@/chat/subagent/process-task", () => ({ + processSubagentTask: processSubagentTaskMock +})); + vi.mock("@/chat/queue/client", () => ({ getThreadMessageTopic: () => "junior-thread-message", + getSubagentTaskTopic: () => "junior-subagent-task", createQueueCallbackHandler: - (handler: (payload: ThreadMessagePayload, meta: { messageId: string; deliveryCount: number; topicName: string }) => Promise) => + (handler: (payload: QueuePayload, meta: { messageId: string; deliveryCount: number; topicName: string }) => Promise) => async (_request: Request) => { - const payload: ThreadMessagePayload = { + const threadPayload: ThreadMessagePayload = { dedupKey: "slack:C123:1700000000.100:1700000000.200", kind: "new_mention", normalizedThreadId: "slack:C123:1700000000.100", @@ -47,6 +54,20 @@ vi.mock("@/chat/queue/client", () => ({ metadata: { dateSent: new Date().toISOString(), edited: false } } }; + const payload: QueuePayload = callbackTopicRef.value === "junior-subagent-task" + ? { + callKey: "conv-1:turn-1:tool-1", + conversationId: "conv-1", + sessionId: "turn-1", + task: "hello", + queueContext: { + dedupKey: threadPayload.dedupKey, + normalizedThreadId: threadPayload.normalizedThreadId, + message: threadPayload.message, + thread: threadPayload.thread + } + } + : threadPayload; await handler(payload, { messageId: "msg_123", @@ -63,6 +84,7 @@ import { POST } from "@/handlers/queue-callback"; describe("queue callback route", () => { beforeEach(() => { processQueuedThreadMessageMock.mockClear(); + processSubagentTaskMock.mockClear(); callbackTopicRef.value = "junior-thread-message"; }); @@ -87,4 +109,13 @@ describe("queue callback route", () => { ); expect(processQueuedThreadMessageMock).not.toHaveBeenCalled(); }); + + it("processes subagent task topic payloads", async () => { + callbackTopicRef.value = "junior-subagent-task"; + + await POST(new Request("http://localhost/api/queue/callback", { method: "POST" })); + + expect(processSubagentTaskMock).toHaveBeenCalledTimes(1); + expect(processQueuedThreadMessageMock).not.toHaveBeenCalled(); + }); }); diff --git a/packages/junior/tests/task-subagent-tool.test.ts b/packages/junior/tests/task-subagent-tool.test.ts new file mode 100644 index 00000000..0c6707a1 --- /dev/null +++ b/packages/junior/tests/task-subagent-tool.test.ts @@ -0,0 +1,116 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { RetryableTurnError } from "@/chat/turn/errors"; +import type { ToolCallOptions } from "@/chat/tools/definition"; + +const { + getSubagentTaskRecordMock, + upsertSubagentTaskRecordMock, + enqueueSubagentTaskMock +} = vi.hoisted(() => ({ + getSubagentTaskRecordMock: vi.fn(), + upsertSubagentTaskRecordMock: vi.fn(async () => undefined), + enqueueSubagentTaskMock: vi.fn(async () => "subagent-msg-1") +})); + +vi.mock("@/chat/state", () => ({ + getSubagentTaskRecord: getSubagentTaskRecordMock, + upsertSubagentTaskRecord: upsertSubagentTaskRecordMock +})); + +vi.mock("@/chat/queue/client", () => ({ + enqueueSubagentTask: enqueueSubagentTaskMock +})); + +import { createTaskSubagentTool } from "@/chat/tools/task-subagent"; + +function baseOptions(): ToolCallOptions { + const queueContext = { + dedupKey: "slack:C123:1700000000.100:1700000000.200", + normalizedThreadId: "slack:C123:1700000000.100", + message: { + _type: "chat:Message" as const, + id: "1700000000.200", + threadId: "slack:C123:1700000000.100", + text: "hello", + formatted: { type: "root" as const, children: [] }, + raw: "hello", + author: { + userId: "U_TEST", + userName: "test-user", + fullName: "Test User", + isBot: false, + isMe: false + }, + attachments: [], + metadata: { dateSent: new Date().toISOString(), edited: false } + }, + thread: { + _type: "chat:Thread" as const, + id: "slack:C123:1700000000.100", + channelId: "C123", + adapterName: "slack", + isDM: false + } + }; + + return { + conversationId: "conv-1", + sessionId: "turn-1", + toolCallId: "tool-1", + queueContext + } satisfies ToolCallOptions; +} + +describe("taskSubagent tool", () => { + beforeEach(() => { + getSubagentTaskRecordMock.mockReset(); + upsertSubagentTaskRecordMock.mockReset(); + enqueueSubagentTaskMock.mockReset(); + upsertSubagentTaskRecordMock.mockResolvedValue(undefined); + enqueueSubagentTaskMock.mockResolvedValue("subagent-msg-1"); + }); + + it("returns completed result when subagent record is complete", async () => { + getSubagentTaskRecordMock.mockResolvedValue({ + status: "completed", + resultText: "done" + }); + + const tool = createTaskSubagentTool(); + const result = await tool.execute?.({ task: "summarize this" }, baseOptions()); + + expect(result).toEqual( + expect.objectContaining({ + ok: true, + status: "completed", + output: "done" + }) + ); + expect(enqueueSubagentTaskMock).not.toHaveBeenCalled(); + }); + + it("enqueues a new subagent task and defers parent turn", async () => { + getSubagentTaskRecordMock.mockResolvedValue(undefined); + + const tool = createTaskSubagentTool(); + await expect(tool.execute?.({ task: "summarize this" }, baseOptions())).rejects.toBeInstanceOf( + RetryableTurnError + ); + + expect(upsertSubagentTaskRecordMock).toHaveBeenCalledTimes(1); + expect(enqueueSubagentTaskMock).toHaveBeenCalledTimes(1); + }); + + it("defers when an existing subagent task is still pending", async () => { + getSubagentTaskRecordMock.mockResolvedValue({ + status: "running" + }); + + const tool = createTaskSubagentTool(); + await expect(tool.execute?.({ task: "summarize this" }, baseOptions())).rejects.toBeInstanceOf( + RetryableTurnError + ); + + expect(enqueueSubagentTaskMock).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/junior/tests/unit/queue/process-thread-message-reaction.test.ts b/packages/junior/tests/unit/queue/process-thread-message-reaction.test.ts index 0a17da7a..b639838b 100644 --- a/packages/junior/tests/unit/queue/process-thread-message-reaction.test.ts +++ b/packages/junior/tests/unit/queue/process-thread-message-reaction.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import type { ThreadMessagePayload } from "@/chat/queue/types"; +import { RetryableTurnError } from "@/chat/turn/errors"; const { getQueueMessageProcessingStateMock, @@ -120,4 +121,19 @@ describe("processQueuedThreadMessage reaction regressions", () => { "Failed to remove processing reaction before sending queue response" ); }); + + it("acks queue ownership for deferred subagent turns", async () => { + const payload = createPayload(); + + await processQueuedThreadMessage(payload, { + clearProcessingReaction: vi.fn(async () => undefined), + processRuntime: vi.fn(async () => { + throw new RetryableTurnError("subagent_task_deferred", "pending child task"); + }), + logWarn: vi.fn() + }); + + expect(completeQueueMessageProcessingOwnershipMock).toHaveBeenCalledTimes(1); + expect(failQueueMessageProcessingOwnershipMock).not.toHaveBeenCalled(); + }); }); diff --git a/packages/junior/tests/unit/slack/tool-registration.test.ts b/packages/junior/tests/unit/slack/tool-registration.test.ts index 0ccc84d3..c9db4f07 100644 --- a/packages/junior/tests/unit/slack/tool-registration.test.ts +++ b/packages/junior/tests/unit/slack/tool-registration.test.ts @@ -12,6 +12,7 @@ describe("Slack tool registration", () => { expect(tools).not.toHaveProperty("slackChannelListMessages"); expect(tools).toHaveProperty("slackMessageAddReaction"); expect(tools).toHaveProperty("slackCanvasCreate"); + expect(tools).toHaveProperty("taskSubagent"); }); it("registers channel-scope tools in shared channel context", () => { @@ -22,6 +23,7 @@ describe("Slack tool registration", () => { expect(tools).toHaveProperty("slackChannelListMessages"); expect(tools).toHaveProperty("slackMessageAddReaction"); expect(tools).toHaveProperty("slackCanvasCreate"); + expect(tools).toHaveProperty("taskSubagent"); }); it("does not register canvas create when channel context is unavailable", () => { @@ -32,5 +34,21 @@ describe("Slack tool registration", () => { expect(tools).not.toHaveProperty("slackChannelListMembers"); expect(tools).not.toHaveProperty("slackChannelListMessages"); expect(tools).not.toHaveProperty("slackMessageAddReaction"); + expect(tools).toHaveProperty("taskSubagent"); + }); + + it("does not register slack tools or recursive subagent tool in subagent execution mode", () => { + const tools = createTools([], {}, { + channelId: "C12345", + isSubagentExecution: true, + sandbox: noopSandbox + }); + + expect(tools).not.toHaveProperty("slackCanvasCreate"); + expect(tools).not.toHaveProperty("slackCanvasUpdate"); + expect(tools).not.toHaveProperty("slackListCreate"); + expect(tools).not.toHaveProperty("slackChannelPostMessage"); + expect(tools).not.toHaveProperty("slackMessageAddReaction"); + expect(tools).not.toHaveProperty("taskSubagent"); }); });