Skip to content

Commit 75679c7

Browse files
authored
fix(sdk): chat HITL continuations no longer break the next LLM call (#3719)
## Summary Multi-step reasoning agents with HITL tools (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) failed on `chat.addToolOutput(...)` continuations — either the wire payload blew the `.in/append` cap (reasoning blobs + tool inputs routinely > 512 KiB), or app-side slimming workarounds got overwritten server-side and the next LLM call landed a tool call with no `arguments`. Both modes are fixed. ## Design The per-turn merge in `chat.agent` now overlays only the tool-part state advances (`output-available` / `output-error` / `approval-responded` / `output-denied`) from the wire copy onto the hydrated/snapshot chain. Previously it replaced the entire message, which dropped `input`, reasoning, and text from the LLM's view whenever the wire was slim. In parallel, `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim the assistant message themselves on `submit-message` continuations: ship `{ id, role, parts: [<resolved tool part only>] }`, everything else reconstructed server-side from `hydrateMessages` or the durable snapshot. Continuation payloads drop from 600 KiB – 1 MiB to ~1 KiB. `references/ai-chat` `aiChatHydrated.hydrateMessages` now upserts by id instead of pushing. With slim continuations, a blind push duplicates the assistant id in the returned chain — the merge updates the first match, the slim duplicate goes straight to `toModelMessages` with no `input`, and the LLM 4xx's. This is the canonical pattern customers should mirror in their own hydrate implementations. ## Test plan - 11 new tests (slim helper unit + slim+merge integration for HITL, approval, default no-hydrate branch) - Full SDK suite: 239 tests pass across 19 files - End-to-end sweep against `references/ai-chat`: 19 customer-side smoke tests green; HITL wire bodies confirmed at ~1 KiB (was 600 KiB+); no provider 4xx errors across OpenAI Responses or Anthropic
1 parent c0365d3 commit 75679c7

8 files changed

Lines changed: 1256 additions & 55 deletions

File tree

.changeset/chat-slim-wire-merge.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that work together:
6+
7+
- The per-turn merge now overlays the wire copy's tool-part state advancement onto the agent's existing chain — `state` + the matching resolution field (`output` / `errorText` / `approval`) come from the wire, everything else (text, reasoning, tool `input`, provider metadata) stays whatever the snapshot or `hydrateMessages` returned. Previously a full-message replace overwrote those fields with whatever the client shipped, so a slimmed wire copy landed a tool call with no `arguments` on the next LLM call. Covers `output-available` / `output-error` (HITL `addToolOutput`) and `approval-responded` / `output-denied` (approval flow).
8+
- `TriggerChatTransport.sendMessages` and `AgentChat.sendRaw` now slim assistant messages that carry advanced tool parts. The wire payload is just `{ id, role, parts: [<state + resolution field>] }` for `submit-message` continuations; everything else passes through. Reasoning blobs and full tool inputs no longer ride the wire on every `addToolOutput` / `addToolApproveResponse`, so continuation payloads stay well under the `.in/append` cap on long agent loops.
9+
10+
Note: `onValidateMessages` receives the slim wire on HITL turns. If you call `validateUIMessages` from `ai` against the full `messages` array it will reject the slim assistant; filter to user messages (or skip on HITL turns) — see the updated docstring on `onValidateMessages` for the recommended pattern.
11+
12+
For `hydrateMessages` hooks that persist the chain, this release also adds a small helper to the `@trigger.dev/sdk/ai` surface:
13+
14+
```ts
15+
import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai";
16+
17+
chat.agent({
18+
hydrateMessages: async ({ chatId, trigger, incomingMessages }) => {
19+
const record = await db.chat.findUnique({ where: { id: chatId } });
20+
const stored = record?.messages ?? [];
21+
if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
22+
await db.chat.update({ where: { id: chatId }, data: { messages: stored } });
23+
}
24+
return stored;
25+
},
26+
});
27+
```
28+
29+
It pushes fresh user messages by id, no-ops on HITL continuations (the incoming shares an id with the existing assistant — the runtime overlays the new tool-state advance), and skips on non-`submit-message` triggers. Returns `true` if it mutated `stored` so the caller knows whether to persist.
30+
31+
Net effect: `chat.addToolOutput(...)` / `chat.addToolApproveResponse(...)` on multi-step reasoning agents (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) no longer blows the cap and no longer corrupts the LLM input.

packages/trigger-sdk/src/v3/ai-shared.ts

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,151 @@ export type InferChatUIMessage<TTask extends AnyTask> = TTask extends Task<
198198
>
199199
? TUIM
200200
: UIMessage;
201+
202+
/**
203+
* Upsert an incoming wire message into the customer's DB-backed chain
204+
* inside a `hydrateMessages` hook. Returns `true` iff the chain was
205+
* mutated (the caller should persist).
206+
*
207+
* Handles the three cases that matter:
208+
*
209+
* - **Non-submit-message trigger** (`regenerate-message` / `action`,
210+
* or `submit-message` with no incoming): no-op. Returns `false`.
211+
* - **Incoming id already in `stored`** (HITL `addToolOutput` /
212+
* `addToolApproveResponse` continuation — the wire carries the
213+
* existing assistant's id with a slim resolution payload): no-op.
214+
* The runtime's per-turn merge overlays the new tool-state advance
215+
* onto the existing entry; pushing again would duplicate the row
216+
* in the chain you return, and the duplicate slim copy would hit
217+
* `toModelMessages` with no `input`. Returns `false`.
218+
* - **Incoming id not in `stored`** (typically a fresh user message
219+
* on a new turn): push. Returns `true`.
220+
*
221+
* Mutates `stored` in place. The caller persists `stored`, not the
222+
* return value.
223+
*
224+
* @example
225+
* ```ts
226+
* import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai";
227+
*
228+
* chat.agent({
229+
* hydrateMessages: async ({ chatId, trigger, incomingMessages }) => {
230+
* const record = await db.chat.findUnique({ where: { id: chatId } });
231+
* const stored = record?.messages ?? [];
232+
* if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
233+
* await db.chat.update({ where: { id: chatId }, data: { messages: stored } });
234+
* }
235+
* return stored;
236+
* },
237+
* });
238+
* ```
239+
*/
240+
export function upsertIncomingMessage<TMsg extends UIMessage = UIMessage>(
241+
stored: TMsg[],
242+
event: {
243+
trigger: "submit-message" | "regenerate-message" | "action";
244+
incomingMessages: TMsg[];
245+
}
246+
): boolean {
247+
if (event.trigger !== "submit-message") return false;
248+
if (event.incomingMessages.length === 0) return false;
249+
const newMsg = event.incomingMessages[event.incomingMessages.length - 1];
250+
if (!newMsg) return false;
251+
if (newMsg.id) {
252+
const existingIdx = stored.findIndex((m) => m.id === newMsg.id);
253+
if (existingIdx !== -1) return false;
254+
}
255+
stored.push(newMsg);
256+
return true;
257+
}
258+
259+
/**
260+
* Tool-part states that the client advances and ships back over the wire.
261+
* Covers HITL `addToolOutput` (output-available / output-error) and the
262+
* approval flow (approval-responded / output-denied). `input-streaming` /
263+
* `input-available` / `approval-requested` are server-emitted only — if
264+
* we see them on the wire we treat them as no-ops and skip the slim/merge.
265+
*/
266+
function isWireAdvanceableToolState(
267+
state: unknown
268+
): state is "output-available" | "output-error" | "approval-responded" | "output-denied" {
269+
return (
270+
state === "output-available" ||
271+
state === "output-error" ||
272+
state === "approval-responded" ||
273+
state === "output-denied"
274+
);
275+
}
276+
277+
/** Whether a tool-UI part is a static (`tool-${name}`) or dynamic tool. */
278+
function isToolPartType(type: unknown): boolean {
279+
return typeof type === "string" && (type.startsWith("tool-") || type === "dynamic-tool");
280+
}
281+
282+
/**
283+
* Slim an outgoing assistant message before it ships on `submit-message`.
284+
*
285+
* When the client calls `addToolOutput(...)` to resolve a HITL tool (or
286+
* `addToolApproveResponse(...)` to approve/deny one), the AI SDK turns
287+
* it into a `submit-message` whose `messages.at(-1)` is the existing
288+
* assistant message with the new state stitched onto a single tool
289+
* part. On a reasoning-heavy multi-step turn, that full assistant
290+
* message can be 600 KB – 1 MB (encrypted reasoning blobs, reasoning
291+
* text, full tool `input` JSON, prior tool outputs) — well over the
292+
* `.in/append` cap.
293+
*
294+
* The agent runtime only consumes the wire-advanced fields of those
295+
* tool parts (state + output / errorText / approval). Everything else
296+
* (text, reasoning, tool `input`) is rebuilt server-side from the
297+
* durable snapshot or `hydrateMessages`. So we drop everything but
298+
* the advanced tool parts here, and reduce those to just the fields
299+
* the server overlays.
300+
*
301+
* The slim only fires when the assistant message carries at least one
302+
* wire-advanceable tool part. Plain assistant resends (no resolved /
303+
* approval-responded tool) and non-assistant messages pass through
304+
* untouched.
305+
*
306+
* Pairs with the per-turn merge on the agent side
307+
* (`mergeIncomingIntoHydrated` in `ai.ts`).
308+
*/
309+
export function slimSubmitMessageForWire<TMsg extends UIMessage | undefined>(
310+
message: TMsg
311+
): TMsg {
312+
if (!message) return message;
313+
if (message.role !== "assistant") return message;
314+
const parts = (message.parts ?? []) as any[];
315+
const advancedToolParts = parts.filter(
316+
(p) =>
317+
p &&
318+
typeof p === "object" &&
319+
isToolPartType(p.type) &&
320+
isWireAdvanceableToolState(p.state)
321+
);
322+
if (advancedToolParts.length === 0) return message;
323+
const slimParts = advancedToolParts.map((p: any) => {
324+
const base: Record<string, unknown> = {
325+
type: p.type,
326+
toolCallId: p.toolCallId,
327+
state: p.state,
328+
};
329+
if (p.type === "dynamic-tool" && typeof p.toolName === "string") {
330+
base.toolName = p.toolName;
331+
}
332+
if (p.state === "output-available") {
333+
base.output = p.output;
334+
if (p.approval !== undefined) base.approval = p.approval;
335+
} else if (p.state === "output-error") {
336+
if (p.errorText !== undefined) base.errorText = p.errorText;
337+
if (p.approval !== undefined) base.approval = p.approval;
338+
} else if (p.state === "approval-responded" || p.state === "output-denied") {
339+
if (p.approval !== undefined) base.approval = p.approval;
340+
}
341+
return base;
342+
});
343+
return {
344+
id: message.id,
345+
role: message.role,
346+
parts: slimParts,
347+
} as unknown as TMsg;
348+
}

0 commit comments

Comments
 (0)