Skip to content

Commit 84c5a22

Browse files
committed
fix(sdk): address PR #3671 review — per-record metadata, beforeBoot bubbles, drain queue on error
1 parent 066ce93 commit 84c5a22

3 files changed

Lines changed: 62 additions & 20 deletions

File tree

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ export async function __replaySessionOutTailProductionPathForTests<
688688
type ReplaySessionInTailImpl = <TUIMessage extends UIMessage>(
689689
sessionId: string,
690690
options?: { lastEventId?: string }
691-
) => Promise<{ message: TUIMessage; seqNum: number }[]>;
691+
) => Promise<{ message: TUIMessage; metadata: unknown; seqNum: number }[]>;
692692
let replaySessionInTailImpl: ReplaySessionInTailImpl | undefined;
693693

694694
export function __setReplaySessionInTailImplForTests(
@@ -724,15 +724,15 @@ export function __setReplaySessionInTailImplForTests(
724724
async function replaySessionInTail<TUIMessage extends UIMessage>(
725725
sessionId: string,
726726
options?: { lastEventId?: string }
727-
): Promise<{ message: TUIMessage; seqNum: number }[]> {
727+
): Promise<{ message: TUIMessage; metadata: unknown; seqNum: number }[]> {
728728
if (replaySessionInTailImpl) {
729729
return await replaySessionInTailImpl<TUIMessage>(sessionId, options);
730730
}
731731
const apiClient = apiClientManager.clientOrThrow();
732732
const response = await apiClient.readSessionStreamRecords(sessionId, "in", {
733733
afterEventId: options?.lastEventId,
734734
});
735-
const out: { message: TUIMessage; seqNum: number }[] = [];
735+
const out: { message: TUIMessage; metadata: unknown; seqNum: number }[] = [];
736736
for (const record of response.records) {
737737
// session.in writers POST `JSON.stringify(chunk)` directly; the
738738
// webapp wraps that in `{ data: <string>, id }` and stores it on
@@ -752,11 +752,19 @@ async function replaySessionInTail<TUIMessage extends UIMessage>(
752752
if (!chunk || typeof chunk !== "object") continue;
753753
const kind = (chunk as { kind?: unknown }).kind;
754754
if (kind !== "message") continue;
755-
const payload = (chunk as { payload?: { trigger?: unknown; message?: unknown } }).payload;
755+
const payload = (
756+
chunk as {
757+
payload?: { trigger?: unknown; message?: unknown; metadata?: unknown };
758+
}
759+
).payload;
756760
if (!payload || payload.trigger !== "submit-message") continue;
757761
const message = payload.message;
758762
if (!message || typeof message !== "object") continue;
759-
out.push({ message: message as TUIMessage, seqNum: record.seqNum });
763+
out.push({
764+
message: message as TUIMessage,
765+
metadata: payload.metadata,
766+
seqNum: record.seqNum,
767+
});
760768
}
761769
return out;
762770
}
@@ -773,7 +781,7 @@ export async function __replaySessionInTailProductionPathForTests<
773781
>(
774782
sessionId: string,
775783
options?: { lastEventId?: string }
776-
): Promise<{ message: TUIMessage; seqNum: number }[]> {
784+
): Promise<{ message: TUIMessage; metadata: unknown; seqNum: number }[]> {
777785
const saved = replaySessionInTailImpl;
778786
replaySessionInTailImpl = undefined;
779787
try {
@@ -4964,7 +4972,7 @@ function chatAgent<
49644972
let bootSnapshot: ChatSnapshotV1<TUIMessage> | undefined;
49654973
let replayedSettled: TUIMessage[] = [];
49664974
let replayedPartial: TUIMessage | undefined;
4967-
let replayedInTail: { message: TUIMessage; seqNum: number }[] = [];
4975+
let replayedInTail: { message: TUIMessage; metadata: unknown; seqNum: number }[] = [];
49684976
// Wire payloads to dispatch as turns before the regular session.in
49694977
// pump kicks in. Populated by `onRecoveryBoot.recoveredTurns` (or its
49704978
// default, `inFlightUsers`). The turn-loop checks this queue ahead of
@@ -5222,14 +5230,11 @@ function chatAgent<
52225230
} else {
52235231
recoveredTurns = inFlightUsers;
52245232
}
5233+
// `beforeBoot` errors bubble — the customer opted into blocking
5234+
// persistence and a failure there should fail the run rather than
5235+
// dispatch recovered turns against half-persisted state.
52255236
if (hookBeforeBoot) {
5226-
try {
5227-
await hookBeforeBoot();
5228-
} catch (error) {
5229-
logger.warn("chat.agent: onRecoveryBoot.beforeBoot threw; continuing", {
5230-
error: error instanceof Error ? error.message : String(error),
5231-
});
5232-
}
5237+
await hookBeforeBoot();
52335238
}
52345239

52355240
// Advance the session.in cursor past every recovered user so
@@ -5247,19 +5252,34 @@ function chatAgent<
52475252
// pops these ahead of `messagesInput.waitWithIdleTimeout` so they
52485253
// dispatch as normal turns with the existing hook stack.
52495254
//
5255+
// Per-record metadata preservation: each session.in record
5256+
// carries its own `payload.metadata` (the transport sets it at
5257+
// send time). Look up the original by message id so a recovered
5258+
// turn dispatches with the metadata its writer actually sent.
5259+
// Fall back to the boot payload's metadata for hook-synthesized
5260+
// messages (customer returned a recoveredTurn with no matching
5261+
// session.in record).
5262+
//
52505263
// OOM-retry dedup: if `payload.message` is the same user message
52515264
// the queue is about to redispatch (the wire payload survives
52525265
// across attempts, but session.in records it once), the wire
52535266
// payload already runs turn 0 — drop the duplicate from the queue
52545267
// so we don't fire the same turn twice.
52555268
const wireMessageId =
52565269
(payload.message as { id?: string } | undefined)?.id;
5270+
const metadataById = new Map<string, unknown>();
5271+
for (const entry of replayedInTail) {
5272+
metadataById.set(entry.message.id, entry.metadata);
5273+
}
52575274
for (const msg of recoveredTurns) {
52585275
if (wireMessageId && msg.id === wireMessageId) continue;
5276+
const recoveredMetadata = metadataById.has(msg.id)
5277+
? metadataById.get(msg.id)
5278+
: payload.metadata;
52595279
bootInjectedQueue.push({
52605280
chatId: payload.chatId,
52615281
sessionId: payload.sessionId,
5262-
metadata: payload.metadata,
5282+
metadata: recoveredMetadata,
52635283
trigger: "submit-message",
52645284
message: msg,
52655285
messageId: msg.id,
@@ -7161,6 +7181,14 @@ function chatAgent<
71617181
return;
71627182
}
71637183

7184+
// Drain remaining recovered turns before idling — a thrown
7185+
// recovered turn shouldn't strand the rest of the boot queue
7186+
// until an unrelated live message arrives.
7187+
if (bootInjectedQueue.length > 0) {
7188+
currentWirePayload = bootInjectedQueue.shift()!;
7189+
continue;
7190+
}
7191+
71647192
// Wait for the next message — same as after a successful turn
71657193
const effectiveIdleTimeout =
71667194
(metadata.get(IDLE_TIMEOUT_METADATA_KEY) as number | undefined) ??

packages/trigger-sdk/src/v3/test/mock-chat-agent.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -420,10 +420,16 @@ export function mockChatAgent(
420420
});
421421

422422
// session.in tail override: each seeded UIMessage becomes a
423-
// { message, seqNum: i+1 } entry. Mirrors the seq-num pattern from the
424-
// out-tail stub so cursor-advance logic is exercised correctly.
423+
// { message, metadata: undefined, seqNum: i+1 } entry. Mirrors the
424+
// seq-num pattern from the out-tail stub so cursor-advance logic is
425+
// exercised correctly. `metadata` is `undefined` for seeded users —
426+
// the boot path falls back to `payload.metadata` for those.
425427
__setReplaySessionInTailImplForTests(async () => {
426-
return seededSessionInMessages.map((message, i) => ({ message, seqNum: i + 1 })) as never;
428+
return seededSessionInMessages.map((message, i) => ({
429+
message,
430+
metadata: undefined,
431+
seqNum: i + 1,
432+
})) as never;
427433
});
428434

429435
// Install the session open override so `sessions.open(id)` returns a

packages/trigger-sdk/test/replay-session-in.test.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,24 @@ describe("replaySessionInTail", () => {
4343
const u1 = userMessage("u-1", "hello");
4444
const u2 = userMessage("u-2", "again");
4545
stubReadRecords([
46-
{ kind: "message", payload: { chatId: "c1", trigger: "submit-message", message: u1 } },
47-
{ kind: "message", payload: { chatId: "c1", trigger: "submit-message", message: u2 } },
46+
{
47+
kind: "message",
48+
payload: { chatId: "c1", trigger: "submit-message", message: u1, metadata: { userId: "a" } },
49+
},
50+
{
51+
kind: "message",
52+
payload: { chatId: "c1", trigger: "submit-message", message: u2, metadata: { userId: "b" } },
53+
},
4854
]);
4955

5056
const result = await replaySessionInTail("sess");
5157
expect(result).toHaveLength(2);
5258
expect(result[0]!.message.id).toBe("u-1");
5359
expect(result[0]!.seqNum).toBe(1);
60+
expect(result[0]!.metadata).toEqual({ userId: "a" });
5461
expect(result[1]!.message.id).toBe("u-2");
5562
expect(result[1]!.seqNum).toBe(2);
63+
expect(result[1]!.metadata).toEqual({ userId: "b" });
5664
});
5765

5866
it("ignores non-message variants (stop, handover, handover-skip)", async () => {

0 commit comments

Comments
 (0)