Skip to content

Commit 0d5e212

Browse files
committed
fix(core,sdk): drain cursor, scan timeout, seqNum NaN guard
Three small follow-ups from the code-review pass. `StandardSessionStreamManager.on()`'s buffer-drain now advances the committed-consume cursor (`lastDispatchedSeqNums`) to the highest buffered seq before deleting the buffer. Drained records are removed from the buffer and no longer reachable via `once()`, so they're effectively consumed from the manager's perspective — without the advance, a worker using `messagesInput.on()` (pendingMessages mode) would persist a `.in` cursor that lags behind records the handler already processed and the next boot would re-deliver them. `findLatestSessionInCursor`'s `timeoutInSeconds` raised from 1 to 5. S2 trim is eventually-consistent over 10-60s; a worker booting just after a trim could still see pre-trim records, and 1s isn't always enough headroom to drain them and read the latest turn-complete header before the SSE long-poll closes. Without the cursor seed, `.in` opens from seq=0 and the slim-wire merge catches the dupes — correct but the optimization is silently lost. `subscribeToSessionStream`'s control-event dispatch replaces `Number.parseInt(part.id, 10) || 0` with an explicit `Number.isFinite` guard that drops the event when the id is non-numeric, rather than firing `onControl` with a misleading `seqNum: 0`. Doesn't matter in practice (S2 always emits numeric ids), but `findLatestSessionInCursor` and the dashboard rely on the seqNum being meaningful for resume.
1 parent f8f9898 commit 0d5e212

3 files changed

Lines changed: 33 additions & 2 deletions

File tree

packages/core/src/v3/apiClient/index.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1346,10 +1346,21 @@ export class ApiClient {
13461346
// Trigger control record — route to onControl, never enqueue.
13471347
const subtype = controlSubtype(part.headers);
13481348
if (subtype) {
1349+
// `part.id` is the S2 seq_num in decimal string form.
1350+
// `parseInt` returns NaN if S2 ever surfaces a non-numeric
1351+
// id (shouldn't happen, but `|| 0` would mask it as a real
1352+
// seq 0). Drop the malformed event rather than fire
1353+
// `onControl` with a misleading cursor — callers like
1354+
// `findLatestSessionInCursor` and the dashboard rely on the
1355+
// seqNum being meaningful for resume.
1356+
const parsedSeqNum = Number.parseInt(part.id, 10);
1357+
if (!Number.isFinite(parsedSeqNum)) {
1358+
return;
1359+
}
13491360
onControl?.({
13501361
subtype,
13511362
headers: part.headers ?? [],
1352-
seqNum: Number.parseInt(part.id, 10) || 0,
1363+
seqNum: parsedSeqNum,
13531364
timestamp: part.timestamp,
13541365
});
13551366
return;

packages/core/src/v3/sessionStreams/manager.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,20 @@ export class StandardSessionStreamManager implements SessionStreamManager {
118118
for (const data of buffered) {
119119
this.#invokeHandler(handler, data);
120120
}
121+
// Advance the committed-consume cursor to the highest seq drained
122+
// into the new handler. `on()`-drain removes the records from the
123+
// buffer, so they're no longer available to a future `once()` —
124+
// from the manager's perspective they've been consumed. Without
125+
// this, a worker that uses `messagesInput.on()` for user-message
126+
// delivery (pendingMessages mode) would persist a `.in` cursor
127+
// that lags behind the records the handler already processed, and
128+
// the next boot would re-deliver them.
129+
const seqList = this.bufferSeqNums.get(key);
130+
if (seqList) {
131+
for (const s of seqList) {
132+
if (s !== undefined) this.#advanceLastDispatched(key, s);
133+
}
134+
}
121135
this.buffer.delete(key);
122136
// Keep `bufferSeqNums` in lock-step with `buffer` — without this,
123137
// the parallel array desyncs and the next `#dispatch` that buffers

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,13 @@ async function findLatestSessionInCursor(
178178
const apiClient = apiClientManager.clientOrThrow();
179179
let latestCursor: number | undefined;
180180
const stream = await apiClient.subscribeToSessionStream<unknown>(chatId, "out", {
181-
timeoutInSeconds: 1,
181+
// 5s rather than 1s: S2 trim is eventually-consistent (10-60s
182+
// window), so a worker booting just after a trim could still see
183+
// pre-trim records and need a bit longer to drain them all before
184+
// the SSE long-poll closes. Without enough headroom the scan would
185+
// fall back to `undefined`, the `.in` cursor wouldn't be seeded,
186+
// and the next subscribe would replay messages already processed.
187+
timeoutInSeconds: 5,
182188
onControl: (event) => {
183189
if (event.subtype !== TRIGGER_CONTROL_SUBTYPE.TURN_COMPLETE) return;
184190
const raw = headerValue(event.headers, SESSION_IN_EVENT_ID_HEADER);

0 commit comments

Comments
 (0)