Skip to content

Commit 41f138d

Browse files
committed
feat(sdk): upsertIncomingMessage helper for hydrateMessages
1 parent 0e54bef commit 41f138d

5 files changed

Lines changed: 201 additions & 23 deletions

File tree

.changeset/chat-slim-wire-merge.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,23 @@ Fix `chat.agent` HITL continuations on reasoning-heavy turns. Two changes that w
99

1010
Note: `onValidateMessages` receives the slim wire on HITL turns. If you call `validateUIMessages` from `ai` against the full `messages` array it will reject the slim assistant; filter to user messages (or skip on HITL turns) — see the updated docstring on `onValidateMessages` for the recommended pattern.
1111

12+
For `hydrateMessages` hooks that persist the chain, this release also adds a small helper to the `@trigger.dev/sdk/ai` surface:
13+
14+
```ts
15+
import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai";
16+
17+
chat.agent({
18+
hydrateMessages: async ({ chatId, trigger, incomingMessages }) => {
19+
const record = await db.chat.findUnique({ where: { id: chatId } });
20+
const stored = record?.messages ?? [];
21+
if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
22+
await db.chat.update({ where: { id: chatId }, data: { messages: stored } });
23+
}
24+
return stored;
25+
},
26+
});
27+
```
28+
29+
It pushes fresh user messages by id, no-ops on HITL continuations (the incoming shares an id with the existing assistant — the runtime overlays the new tool-state advance), and skips on non-`submit-message` triggers. Returns `true` if it mutated `stored` so the caller knows whether to persist.
30+
1231
Net effect: `chat.addToolOutput(...)` / `chat.addToolApproveResponse(...)` on multi-step reasoning agents (OpenAI Responses with `store: false`, Anthropic extended thinking, etc.) no longer blows the cap and no longer corrupts the LLM input.

packages/trigger-sdk/src/v3/ai-shared.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,63 @@ export type InferChatUIMessage<TTask extends AnyTask> = TTask extends Task<
199199
? TUIM
200200
: UIMessage;
201201

202+
/**
203+
* Upsert an incoming wire message into the customer's DB-backed chain
204+
* inside a `hydrateMessages` hook. Returns `true` iff the chain was
205+
* mutated (the caller should persist).
206+
*
207+
* Handles the three cases that matter:
208+
*
209+
* - **Non-submit-message trigger** (`regenerate-message` / `action`,
210+
* or `submit-message` with no incoming): no-op. Returns `false`.
211+
* - **Incoming id already in `stored`** (HITL `addToolOutput` /
212+
* `addToolApproveResponse` continuation — the wire carries the
213+
* existing assistant's id with a slim resolution payload): no-op.
214+
* The runtime's per-turn merge overlays the new tool-state advance
215+
* onto the existing entry; pushing again would duplicate the row
216+
* in the chain you return, and the duplicate slim copy would hit
217+
* `toModelMessages` with no `input`. Returns `false`.
218+
* - **Incoming id not in `stored`** (typically a fresh user message
219+
* on a new turn): push. Returns `true`.
220+
*
221+
* Mutates `stored` in place. The caller persists `stored`, not the
222+
* return value.
223+
*
224+
* @example
225+
* ```ts
226+
* import { chat, upsertIncomingMessage } from "@trigger.dev/sdk/ai";
227+
*
228+
* chat.agent({
229+
* hydrateMessages: async ({ chatId, trigger, incomingMessages }) => {
230+
* const record = await db.chat.findUnique({ where: { id: chatId } });
231+
* const stored = record?.messages ?? [];
232+
* if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
233+
* await db.chat.update({ where: { id: chatId }, data: { messages: stored } });
234+
* }
235+
* return stored;
236+
* },
237+
* });
238+
* ```
239+
*/
240+
export function upsertIncomingMessage<TMsg extends UIMessage = UIMessage>(
241+
stored: TMsg[],
242+
event: {
243+
trigger: "submit-message" | "regenerate-message" | "action";
244+
incomingMessages: TMsg[];
245+
}
246+
): boolean {
247+
if (event.trigger !== "submit-message") return false;
248+
if (event.incomingMessages.length === 0) return false;
249+
const newMsg = event.incomingMessages[event.incomingMessages.length - 1];
250+
if (!newMsg) return false;
251+
if (newMsg.id) {
252+
const existingIdx = stored.findIndex((m) => m.id === newMsg.id);
253+
if (existingIdx !== -1) return false;
254+
}
255+
stored.push(newMsg);
256+
return true;
257+
}
258+
202259
/**
203260
* Tool-part states that the client advances and ships back over the wire.
204261
* Covers HITL `addToolOutput` (output-available / output-error) and the

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2571,7 +2571,7 @@ export type PendingMessagesOptions<TUIM extends UIMessage = UIMessage> = {
25712571
// React hooks (`@trigger.dev/sdk/chat/react`) can import it without
25722572
// dragging `ai.ts` into the browser graph. Re-exported here so
25732573
// `@trigger.dev/sdk/ai` consumers still see it.
2574-
export { PENDING_MESSAGE_INJECTED_TYPE } from "./ai-shared.js";
2574+
export { PENDING_MESSAGE_INJECTED_TYPE, upsertIncomingMessage } from "./ai-shared.js";
25752575
import { PENDING_MESSAGE_INJECTED_TYPE } from "./ai-shared.js";
25762576

25772577
/** @internal */

packages/trigger-sdk/test/wire-shape.test.ts

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import "../src/v3/test/index.js";
1111
import type { UIMessage } from "ai";
1212
import { describe, expect, expectTypeOf, it } from "vitest";
1313
import type { ChatInputChunk, ChatTaskWirePayload } from "../src/v3/ai-shared.js";
14-
import { slimSubmitMessageForWire } from "../src/v3/ai-shared.js";
14+
import { slimSubmitMessageForWire, upsertIncomingMessage } from "../src/v3/ai-shared.js";
1515

1616
describe("ChatTaskWirePayload (slim wire shape)", () => {
1717
it("encodes and decodes a submit-message payload through JSON", () => {
@@ -158,6 +158,119 @@ describe("ChatTaskWirePayload (slim wire shape)", () => {
158158
});
159159
});
160160

161+
describe("upsertIncomingMessage", () => {
162+
const userMsg = (id: string, text: string): UIMessage => ({
163+
id,
164+
role: "user",
165+
parts: [{ type: "text", text }],
166+
});
167+
168+
it("pushes a fresh user message and returns true", () => {
169+
const stored: UIMessage[] = [userMsg("u-1", "first")];
170+
const mutated = upsertIncomingMessage(stored, {
171+
trigger: "submit-message",
172+
incomingMessages: [userMsg("u-2", "second")],
173+
});
174+
expect(mutated).toBe(true);
175+
expect(stored).toHaveLength(2);
176+
expect(stored[1]!.id).toBe("u-2");
177+
});
178+
179+
it("no-ops when incoming id is already in stored (HITL continuation)", () => {
180+
const head = {
181+
id: "asst-1",
182+
role: "assistant" as const,
183+
parts: [{ type: "tool-search", toolCallId: "tc-1", state: "input-available", input: {} } as never],
184+
};
185+
const stored: UIMessage[] = [userMsg("u-1", "hi"), head];
186+
const slim = {
187+
id: "asst-1",
188+
role: "assistant" as const,
189+
parts: [{ type: "tool-search", toolCallId: "tc-1", state: "output-available", output: {} } as never],
190+
};
191+
const mutated = upsertIncomingMessage(stored, {
192+
trigger: "submit-message",
193+
incomingMessages: [slim],
194+
});
195+
expect(mutated).toBe(false);
196+
expect(stored).toHaveLength(2);
197+
// The original head is untouched — the runtime's per-turn merge
198+
// overlays the resolution; the customer's stored array is just
199+
// the pre-merge snapshot.
200+
expect(stored[1]).toBe(head);
201+
});
202+
203+
it("no-ops on regenerate-message trigger", () => {
204+
const stored: UIMessage[] = [userMsg("u-1", "hi")];
205+
const mutated = upsertIncomingMessage(stored, {
206+
trigger: "regenerate-message",
207+
incomingMessages: [userMsg("u-2", "ignored")],
208+
});
209+
expect(mutated).toBe(false);
210+
expect(stored).toHaveLength(1);
211+
});
212+
213+
it("no-ops on action trigger", () => {
214+
const stored: UIMessage[] = [userMsg("u-1", "hi")];
215+
const mutated = upsertIncomingMessage(stored, {
216+
trigger: "action",
217+
incomingMessages: [],
218+
});
219+
expect(mutated).toBe(false);
220+
expect(stored).toHaveLength(1);
221+
});
222+
223+
it("no-ops on empty incomingMessages", () => {
224+
const stored: UIMessage[] = [userMsg("u-1", "hi")];
225+
const mutated = upsertIncomingMessage(stored, {
226+
trigger: "submit-message",
227+
incomingMessages: [],
228+
});
229+
expect(mutated).toBe(false);
230+
expect(stored).toHaveLength(1);
231+
});
232+
233+
it("only inspects the last incoming message (slim wire ships at most one)", () => {
234+
const stored: UIMessage[] = [userMsg("u-1", "hi")];
235+
const mutated = upsertIncomingMessage(stored, {
236+
trigger: "submit-message",
237+
incomingMessages: [userMsg("ignored", "ignored"), userMsg("u-3", "new")],
238+
});
239+
expect(mutated).toBe(true);
240+
expect(stored).toHaveLength(2);
241+
expect(stored[1]!.id).toBe("u-3");
242+
});
243+
244+
it("pushes when newMsg has no id (no dedup possible)", () => {
245+
const stored: UIMessage[] = [userMsg("u-1", "hi")];
246+
const incoming = { role: "user", parts: [{ type: "text", text: "no id" }] } as unknown as UIMessage;
247+
const mutated = upsertIncomingMessage(stored, {
248+
trigger: "submit-message",
249+
incomingMessages: [incoming],
250+
});
251+
expect(mutated).toBe(true);
252+
expect(stored).toHaveLength(2);
253+
});
254+
255+
it("accepts the full hydrateMessages event without re-packaging", () => {
256+
// Customers can pass the destructured event directly — the helper
257+
// only reads `trigger` + `incomingMessages` but ignores any other
258+
// fields the event happens to carry.
259+
const stored: UIMessage[] = [];
260+
const event = {
261+
chatId: "chat-1",
262+
turn: 0,
263+
trigger: "submit-message" as const,
264+
incomingMessages: [userMsg("u-1", "hi")],
265+
previousMessages: [],
266+
continuation: false,
267+
};
268+
const mutated = upsertIncomingMessage(stored, event);
269+
expect(mutated).toBe(true);
270+
expect(stored).toHaveLength(1);
271+
});
272+
});
273+
161274
describe("slimSubmitMessageForWire", () => {
162275
it("passes user messages through unchanged", () => {
163276
const userMsg: UIMessage = {

references/ai-chat/src/trigger/chat.ts

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
1+
import { chat, upsertIncomingMessage, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
22
import { logger, prompts, skills } from "@trigger.dev/sdk";
33

44
import {
@@ -887,30 +887,19 @@ export const aiChatHydrated = chat
887887

888888
// Load message history from the database on every turn.
889889
// The frontend's accumulated messages are ignored — the DB is the
890-
// single source of truth. New user messages arrive in `incomingMessages`
891-
// and are upserted by id + persisted before returning.
892-
//
893-
// HITL continuations (tool-output answers from `addToolOutput`) arrive
894-
// as a slim assistant message whose id matches the existing assistant
895-
// already in the chain — those land via the SDK's per-turn merge, so
896-
// they're skipped here. Only brand-new ids (typically the user's next
897-
// message) get appended.
890+
// single source of truth. `upsertIncomingMessage` handles HITL
891+
// continuations (slim wire sharing an id with the existing
892+
// assistant — no-op so the runtime overlays the new state) and
893+
// fresh user messages (push + persist).
898894
hydrateMessages: async ({ chatId, trigger, incomingMessages }) => {
899895
const record = await prisma.chat.findUnique({ where: { id: chatId } });
900896
const stored = (record?.messages as unknown as UIMessage[]) ?? [];
901897

902-
if (trigger === "submit-message" && incomingMessages.length > 0) {
903-
const newMsg = incomingMessages[incomingMessages.length - 1]!;
904-
const existingIdx = newMsg.id
905-
? stored.findIndex((m) => m.id === newMsg.id)
906-
: -1;
907-
if (existingIdx === -1) {
908-
stored.push(newMsg);
909-
await prisma.chat.update({
910-
where: { id: chatId },
911-
data: { messages: stored as unknown as ChatMessagesForWrite },
912-
});
913-
}
898+
if (upsertIncomingMessage(stored, { trigger, incomingMessages })) {
899+
await prisma.chat.update({
900+
where: { id: chatId },
901+
data: { messages: stored as unknown as ChatMessagesForWrite },
902+
});
914903
}
915904

916905
return stored;

0 commit comments

Comments
 (0)