Skip to content

Commit 4e44d2b

Browse files
committed
feat(chat): add stopGeneration, fix onTurnComplete/onFinishPromise, add /chats/[chatId] route to ai-chat
1 parent 9ea85ce commit 4e44d2b

File tree

12 files changed

+453
-197
lines changed

12 files changed

+453
-197
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3199,8 +3199,13 @@ function chatAgent<
31993199

32003200
// Wait for onFinish to fire — on abort this may resolve slightly
32013201
// after pipeChat, since the stream's cancel() handler is async.
3202+
// Race with a timeout so a stop-abort that prevents onFinish from
3203+
// firing doesn't hang the turn loop indefinitely.
32023204
if (onFinishAttached) {
3203-
await onFinishPromise;
3205+
await Promise.race([
3206+
onFinishPromise,
3207+
new Promise<void>((r) => setTimeout(r, 2_000)),
3208+
]);
32043209
}
32053210

32063211
// Capture token usage from the streamText result (if available).
@@ -3539,7 +3544,7 @@ function chatAgent<
35393544
async () => {
35403545
await onTurnComplete({
35413546
...turnCompleteEvent,
3542-
lastEventId: turnCompleteResult.lastEventId,
3547+
lastEventId: turnCompleteResult?.lastEventId,
35433548
});
35443549

35453550
// Check if onTurnComplete replaced messages (compaction)

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
607607
reconnectToStream = async (
608608
options: {
609609
chatId: string;
610+
abortSignal?: AbortSignal | undefined;
610611
} & ChatRequestOptions
611612
): Promise<ReadableStream<UIMessageChunk> | null> => {
612613
const session = this.sessions.get(options.chatId);
@@ -623,15 +624,88 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
623624
const abortController = new AbortController();
624625
this.activeStreams.set(options.chatId, abortController);
625626

627+
// When the AI SDK (or caller) provides an abortSignal (e.g. from
628+
// useChat's stop()), use it as the stream signal so stop sends
629+
// the stop input stream signal to the backend. Fall back to the
630+
// internal controller for stream lifecycle management.
631+
const abortSignal = options.abortSignal
632+
? AbortSignal.any([options.abortSignal, abortController.signal])
633+
: abortController.signal;
634+
626635
return this.subscribeToStream(
627636
session.runId,
628637
session.publicAccessToken,
629-
abortController.signal,
638+
abortSignal,
630639
options.chatId,
631-
{ sendStopOnAbort: false }
640+
// Send stop when the caller's signal fires (user-initiated stop).
641+
// The internal abortController is only for stream management.
642+
{ sendStopOnAbort: !!options.abortSignal }
632643
);
633644
};
634645

646+
/**
647+
* Stop the current generation for a chat session.
648+
*
649+
* Sends a stop signal to the backend task via input streams and closes
650+
* the active SSE connection. Use this as your stop button handler —
651+
* it works for both initial connections and reconnected streams
652+
* (after page refresh).
653+
*
654+
* When the upstream AI SDK fix lands (passing `abortSignal` through
655+
* `reconnectToStream`), `useChat`'s built-in `stop()` will also work.
656+
* Until then, use this method for reliable stop behavior.
657+
*
658+
* @returns `true` if the stop signal was sent, `false` if there's no active session.
659+
*
660+
* @example
661+
* ```tsx
662+
* const transport = useTriggerChatTransport({ task: "my-chat", ... });
663+
* const { messages, sendMessage } = useChat({ transport });
664+
*
665+
* <button onClick={() => transport.stopGeneration(chatId)}>Stop</button>
666+
* ```
667+
*/
668+
stopGeneration = async (chatId: string): Promise<boolean> => {
669+
const session = this.sessions.get(chatId);
670+
if (!session?.runId) return false;
671+
672+
const sendStop = async (token: string) => {
673+
const api = new ApiClient(this.baseURL, token);
674+
await api.sendInputStream(session.runId, CHAT_STOP_STREAM_ID, { stop: true });
675+
};
676+
677+
try {
678+
await sendStop(session.publicAccessToken);
679+
} catch (err) {
680+
if (isRunPatAuthError(err) && this.renewRunAccessToken) {
681+
const newToken = await this.renewRunPatForSession(chatId, session.runId);
682+
if (newToken) {
683+
try {
684+
await sendStop(newToken);
685+
} catch {
686+
return false;
687+
}
688+
} else {
689+
return false;
690+
}
691+
} else {
692+
return false;
693+
}
694+
}
695+
696+
session.skipToTurnComplete = true;
697+
698+
// Abort the active stream (if any) to close the SSE connection
699+
// and end the ReadableStream, causing useChat to finalize.
700+
const activeStream = this.activeStreams.get(chatId);
701+
if (activeStream) {
702+
activeStream.abort();
703+
this.activeStreams.delete(chatId);
704+
}
705+
706+
return true;
707+
};
708+
635709
/**
636710
* Get the current session state for a chat, suitable for external persistence.
637711
*
@@ -1140,4 +1214,3 @@ export {
11401214
type InferChatClientData,
11411215
type InferChatUIMessage,
11421216
} from "./chat-client.js";
1143-

0 commit comments

Comments
 (0)