Skip to content

Commit 1b7c73f

Browse files
d-csclaude
andcommitted
fix(webapp): mollifier mutate-fallback watches Redis, not the writer
The wait-and-bounce loop for mutations racing a mid-drain run polled the PG primary on a fixed 20ms cadence with no jitter — up to ~100 reads per request, synchronized across concurrent waiters, piling load onto the writer exactly when mollifier is engaged to shed it. The drainer writes the canonical PG row BEFORE it acks (sets `materialised`) or fails (deletes the entry), so the buffer entry's own state is an authoritative, already-in-Redis signal for "is the row in PG yet?". Watch that (cheap Redis getEntry) instead, and touch the primary exactly once — for the actual mutation — only after it resolves. Poll gaps now use jittered exponential backoff (20ms → 250ms cap). Drops the per-poll PG timeout race (DEFAULT_PG_TIMEOUT_MS / pgTimeoutMs / findRunInPgWithTimeout), unneeded now that PG is read once rather than in a tight loop. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b5d7213 commit 1b7c73f

3 files changed

Lines changed: 176 additions & 69 deletions

File tree

.server-changes/mollifier-mutations.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ area: webapp
33
type: feature
44
---
55

6-
Mollifier API mutations on buffered runs: tag, metadata, replay, reschedule, cancel, and idempotency-key reset via a buffer-snapshot fallback.
6+
Mollifier API mutations on buffered runs: tag, metadata, replay, reschedule, cancel, and idempotency-key reset via a buffer-snapshot fallback. When a mutation races a mid-drain run, the wait-and-bounce loop watches the buffer entry in Redis (cheap) and reads the primary exactly once for the actual mutation, instead of polling the writer on a fixed cadence; polls use jittered exponential backoff.

apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts

Lines changed: 42 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ import { getMollifierBuffer } from "./mollifierBuffer.server";
1010

1111
// Wait/retry knobs per Q3 design. Exported for tests.
1212
export const DEFAULT_SAFETY_NET_MS = 2_000;
13+
// Initial gap between buffer polls; grows by BACKOFF_FACTOR up to
14+
// DEFAULT_MAX_POLL_STEP_MS so a slow drain doesn't poll at a tight fixed
15+
// cadence for the whole safety-net budget.
1316
export const DEFAULT_POLL_STEP_MS = 20;
14-
export const DEFAULT_PG_TIMEOUT_MS = 50;
17+
export const DEFAULT_MAX_POLL_STEP_MS = 250;
18+
const BACKOFF_FACTOR = 1.7;
1519

1620
export type MutateWithFallbackInput<TResponse> = {
1721
runId: string;
@@ -28,13 +32,16 @@ export type MutateWithFallbackInput<TResponse> = {
2832
// Override defaults for tests.
2933
safetyNetMs?: number;
3034
pollStepMs?: number;
31-
pgTimeoutMs?: number;
35+
maxPollStepMs?: number;
3236
// Test injection.
3337
getBuffer?: () => MollifierBuffer | null;
3438
prismaWriter?: TaskRunReader;
3539
prismaReplica?: TaskRunReader;
3640
sleep?: (ms: number) => Promise<void>;
3741
now?: () => number;
42+
// Jitter source; defaults to Math.random. Inject `() => 0` for
43+
// deterministic poll timing in tests.
44+
random?: () => number;
3845
};
3946

4047
export type MutateWithFallbackOutcome<TResponse> =
@@ -92,32 +99,49 @@ export async function mutateWithFallback<TResponse>(
9299
return { kind: "not_found" };
93100
}
94101

95-
// result === "busy" — entry is DRAINING / FAILED / materialised. Wait
96-
// for the drainer to terminate the entry into PG (success or
97-
// SYSTEM_FAILURE) and route through pgMutation.
102+
// result === "busy" — the entry is mid-handoff (DRAINING) or already
103+
// materialised. We do NOT poll the primary for the row to appear: that
104+
// piles read load onto the writer at exactly the moment mollifier exists
105+
// to shed it. Instead we watch the buffer entry itself (cheap Redis
106+
// reads). The drainer writes the PG row BEFORE it acks (sets
107+
// `materialised`) or fails (deletes the entry), so the entry's own state
108+
// is an authoritative, already-in-Redis signal for "is the row in PG
109+
// yet?". Only once it resolves do we touch the primary — exactly once,
110+
// for the real mutation.
98111
const safetyNetMs = input.safetyNetMs ?? DEFAULT_SAFETY_NET_MS;
99-
const pollStepMs = input.pollStepMs ?? DEFAULT_POLL_STEP_MS;
100-
const pgTimeoutMs = input.pgTimeoutMs ?? DEFAULT_PG_TIMEOUT_MS;
112+
const maxPollStepMs = input.maxPollStepMs ?? DEFAULT_MAX_POLL_STEP_MS;
113+
const random = input.random ?? Math.random;
101114
const deadline = now() + safetyNetMs;
115+
let step = input.pollStepMs ?? DEFAULT_POLL_STEP_MS;
102116

103117
while (now() < deadline) {
104118
if (input.abortSignal?.aborted) {
105119
return { kind: "timed_out" };
106120
}
107121

108-
const row = await findRunInPgWithTimeout(
109-
writer,
110-
input.runId,
111-
input.environmentId,
112-
pgTimeoutMs,
113-
);
114-
if (row) {
115-
const response = await input.pgMutation(row);
116-
return { kind: "pg", response };
122+
const entry = await buffer.getEntry(input.runId);
123+
// Resolved when the entry is gone (`fail` deleted it after writing a
124+
// terminal SYSTEM_FAILURE row) or materialised (`ack` after a
125+
// successful trigger / cancel write). In both cases the PG row is now
126+
// committed on the primary, so read it once and route through the
127+
// canonical PG mutation path.
128+
if (entry === null || entry.materialised === true) {
129+
const row = await findRunInPg(writer, input.runId, input.environmentId);
130+
if (row) {
131+
const response = await input.pgMutation(row);
132+
return { kind: "pg", response };
133+
}
134+
// Entry gone with no PG row: the drainer's terminal write itself
135+
// failed (PG unreachable). Nothing to mutate.
136+
return { kind: "not_found" };
117137
}
118-
138+
// Still QUEUED (requeued after a retryable drain error) or DRAINING —
139+
// the run hasn't reached PG. Back off with jitter so concurrent
140+
// waiters on the same draining run don't requery in lockstep.
119141
if (now() >= deadline) break;
120-
await sleep(pollStepMs);
142+
const jittered = step + Math.floor(random() * step);
143+
await sleep(jittered);
144+
step = Math.min(Math.ceil(step * BACKOFF_FACTOR), maxPollStepMs);
121145
}
122146

123147
logger.warn("mollifier mutate-with-fallback: drainer resolution timed out", {
@@ -148,32 +172,6 @@ async function findRunInPg(
148172
});
149173
}
150174

151-
async function findRunInPgWithTimeout(
152-
client: TaskRunReader,
153-
friendlyId: string,
154-
environmentId: string,
155-
timeoutMs: number,
156-
): Promise<TaskRun | null> {
157-
// One slow PG query shouldn't burn the whole safety-net budget.
158-
// Promise.race against a timer; on timeout we treat the poll as a miss
159-
// and the outer loop tries again on the next tick.
160-
const timeoutToken = Symbol("pg-timeout");
161-
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
162-
const timeoutPromise = new Promise<typeof timeoutToken>((resolve) => {
163-
timeoutHandle = setTimeout(() => resolve(timeoutToken), timeoutMs);
164-
});
165-
try {
166-
const winner = await Promise.race([
167-
findRunInPg(client, friendlyId, environmentId),
168-
timeoutPromise,
169-
]);
170-
if (winner === timeoutToken) return null;
171-
return winner;
172-
} finally {
173-
if (timeoutHandle) clearTimeout(timeoutHandle);
174-
}
175-
}
176-
177175
function defaultSleep(ms: number): Promise<void> {
178176
return new Promise((resolve) => setTimeout(resolve, ms));
179177
}

apps/webapp/test/mollifierMutateWithFallback.test.ts

Lines changed: 133 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ vi.mock("~/db.server", () => ({
66
}));
77

88
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
9-
import type { MollifierBuffer, MutateSnapshotResult } from "@trigger.dev/redis-worker";
9+
import type {
10+
BufferEntry,
11+
MollifierBuffer,
12+
MutateSnapshotResult,
13+
} from "@trigger.dev/redis-worker";
1014
import type { TaskRun } from "@trigger.dev/database";
1115

1216
type FindFirst = ReturnType<typeof vi.fn>;
@@ -22,9 +26,30 @@ function fakePrisma(rows: Array<TaskRun | null>): PrismaStub {
2226
function bufferReturning(result: MutateSnapshotResult): MollifierBuffer {
2327
return {
2428
mutateSnapshot: vi.fn(async () => result),
29+
getEntry: vi.fn(async () => null),
2530
} as unknown as MollifierBuffer;
2631
}
2732

33+
// Buffer whose mutateSnapshot returns "busy" and whose getEntry walks a
34+
// scripted sequence of entry states (the drainer's progress). The last
35+
// element repeats once the sequence is exhausted.
36+
function bufferBusy(entries: Array<BufferEntry | null>): MollifierBuffer {
37+
const getEntry = vi.fn();
38+
for (const e of entries) getEntry.mockResolvedValueOnce(e);
39+
getEntry.mockResolvedValue(entries.length ? entries[entries.length - 1] : null);
40+
return {
41+
mutateSnapshot: vi.fn(async () => "busy" as const),
42+
getEntry,
43+
} as unknown as MollifierBuffer;
44+
}
45+
46+
const entryDraining = (): BufferEntry =>
47+
({ status: "DRAINING", materialised: false }) as unknown as BufferEntry;
48+
const entryQueued = (): BufferEntry =>
49+
({ status: "QUEUED", materialised: false }) as unknown as BufferEntry;
50+
const entryMaterialised = (): BufferEntry =>
51+
({ status: "DRAINING", materialised: true }) as unknown as BufferEntry;
52+
2853
const fakeRun = (overrides: Partial<TaskRun> = {}): TaskRun =>
2954
({
3055
id: "pg_id",
@@ -101,55 +126,138 @@ describe("mutateWithFallback", () => {
101126
expect(pgMutation).toHaveBeenCalledWith(row);
102127
});
103128

104-
it("replica miss + buffer busy + writer resolves mid-wait → pgMutation", async () => {
129+
it("busy → watches buffer through DRAINING, materialises, hits primary exactly once", async () => {
105130
const row = fakeRun();
106131
const pgMutation = vi.fn(async () => "pg-after-wait");
107-
// Replica misses; writer misses twice, then hits.
108-
const writer = fakePrisma([null, null, row]);
132+
// Writer is read ONCE, only after the buffer reports materialised.
133+
const writer = fakePrisma([row]);
134+
const buffer = bufferBusy([entryDraining(), entryDraining(), entryMaterialised()]);
109135
let nowValue = 0;
110136
const result = await mutateWithFallback({
111137
...baseInput,
112138
pgMutation,
113139
synthesisedResponse: () => "snap",
114140
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
115141
prismaWriter: writer as unknown as typeof import("~/db.server").prisma,
116-
getBuffer: () => bufferReturning("busy"),
117-
sleep: async () => {
118-
nowValue += 20;
142+
getBuffer: () => buffer,
143+
sleep: async (ms) => {
144+
nowValue += ms;
119145
},
120146
now: () => nowValue,
121147
safetyNetMs: 2000,
122148
pollStepMs: 20,
123-
pgTimeoutMs: 50,
149+
random: () => 0,
124150
});
125151
expect(result).toEqual({ kind: "pg", response: "pg-after-wait" });
126152
expect(pgMutation).toHaveBeenCalledWith(row);
127-
// Writer should have been polled 3 times before the hit.
128-
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(3);
153+
// Detection happened against Redis (3 polls), the primary exactly once.
154+
expect(buffer.getEntry).toHaveBeenCalledTimes(3);
155+
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1);
156+
});
157+
158+
it("busy → entry deleted by terminal fail, writer finds SYSTEM_FAILURE row → pgMutation", async () => {
159+
const row = fakeRun();
160+
const pgMutation = vi.fn(async () => "pg-failed-row");
161+
const writer = fakePrisma([row]);
162+
const buffer = bufferBusy([entryDraining(), null]);
163+
let nowValue = 0;
164+
const result = await mutateWithFallback({
165+
...baseInput,
166+
pgMutation,
167+
synthesisedResponse: () => "snap",
168+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
169+
prismaWriter: writer as unknown as typeof import("~/db.server").prisma,
170+
getBuffer: () => buffer,
171+
sleep: async (ms) => {
172+
nowValue += ms;
173+
},
174+
now: () => nowValue,
175+
safetyNetMs: 2000,
176+
pollStepMs: 20,
177+
random: () => 0,
178+
});
179+
expect(result).toEqual({ kind: "pg", response: "pg-failed-row" });
180+
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1);
181+
});
182+
183+
it("busy → entry deleted but no PG row (terminal write failed) → not_found", async () => {
184+
const buffer = bufferBusy([null]);
185+
const writer = fakePrisma([null]);
186+
let nowValue = 0;
187+
const result = await mutateWithFallback({
188+
...baseInput,
189+
pgMutation: async () => "pg",
190+
synthesisedResponse: () => "snap",
191+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
192+
prismaWriter: writer as unknown as typeof import("~/db.server").prisma,
193+
getBuffer: () => buffer,
194+
sleep: async (ms) => {
195+
nowValue += ms;
196+
},
197+
now: () => nowValue,
198+
safetyNetMs: 2000,
199+
pollStepMs: 20,
200+
random: () => 0,
201+
});
202+
expect(result).toEqual({ kind: "not_found" });
203+
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1);
204+
});
205+
206+
it("busy → requeued (back to QUEUED) then materialises; doesn't resolve early", async () => {
207+
const row = fakeRun();
208+
const pgMutation = vi.fn(async () => "pg-after-requeue");
209+
const writer = fakePrisma([row]);
210+
// QUEUED (requeued after a retryable drain error) must NOT be treated
211+
// as "done" — the run hasn't reached PG. Only the later materialise does.
212+
const buffer = bufferBusy([entryQueued(), entryDraining(), entryMaterialised()]);
213+
let nowValue = 0;
214+
const result = await mutateWithFallback({
215+
...baseInput,
216+
pgMutation,
217+
synthesisedResponse: () => "snap",
218+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
219+
prismaWriter: writer as unknown as typeof import("~/db.server").prisma,
220+
getBuffer: () => buffer,
221+
sleep: async (ms) => {
222+
nowValue += ms;
223+
},
224+
now: () => nowValue,
225+
safetyNetMs: 2000,
226+
pollStepMs: 20,
227+
random: () => 0,
228+
});
229+
expect(result).toEqual({ kind: "pg", response: "pg-after-requeue" });
230+
expect(buffer.getEntry).toHaveBeenCalledTimes(3);
231+
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1);
129232
});
130233

131-
it("replica miss + buffer busy + drainer never resolves → timed_out", async () => {
234+
it("busy → drainer never resolves (stays DRAINING) → timed_out, primary never touched", async () => {
235+
const writer = fakePrisma([]);
236+
const buffer = bufferBusy([entryDraining()]);
132237
let nowValue = 0;
133238
const result = await mutateWithFallback({
134239
...baseInput,
135240
pgMutation: async () => "pg",
136241
synthesisedResponse: () => "snap",
137242
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
138-
prismaWriter: fakePrisma([null, null, null, null, null]) as unknown as typeof import("~/db.server").prisma,
139-
getBuffer: () => bufferReturning("busy"),
140-
sleep: async () => {
141-
nowValue += 20;
243+
prismaWriter: writer as unknown as typeof import("~/db.server").prisma,
244+
getBuffer: () => buffer,
245+
sleep: async (ms) => {
246+
nowValue += ms;
142247
},
143248
now: () => nowValue,
144-
safetyNetMs: 60,
249+
safetyNetMs: 100,
145250
pollStepMs: 20,
146-
pgTimeoutMs: 5,
251+
random: () => 0,
147252
});
148253
expect(result).toEqual({ kind: "timed_out" });
254+
// The whole point: while the run is still draining we never read the primary.
255+
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(0);
149256
});
150257

151258
it("abort signal during wait → timed_out without further polls", async () => {
152-
const writer = fakePrisma([null, null, null]);
259+
const writer = fakePrisma([]);
260+
const buffer = bufferBusy([entryDraining(), entryDraining()]);
153261
const controller = new AbortController();
154262
let nowValue = 0;
155263
const result = await mutateWithFallback({
@@ -158,20 +266,21 @@ describe("mutateWithFallback", () => {
158266
synthesisedResponse: () => "snap",
159267
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
160268
prismaWriter: writer as unknown as typeof import("~/db.server").prisma,
161-
getBuffer: () => bufferReturning("busy"),
162-
sleep: async () => {
163-
nowValue += 20;
269+
getBuffer: () => buffer,
270+
sleep: async (ms) => {
271+
nowValue += ms;
164272
controller.abort();
165273
},
166274
now: () => nowValue,
167275
safetyNetMs: 2000,
168276
pollStepMs: 20,
169-
pgTimeoutMs: 5,
277+
random: () => 0,
170278
abortSignal: controller.signal,
171279
});
172280
expect(result).toEqual({ kind: "timed_out" });
173-
// One poll happened before the sleep+abort.
174-
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1);
281+
// One buffer poll happened before the sleep+abort; primary untouched.
282+
expect(buffer.getEntry).toHaveBeenCalledTimes(1);
283+
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(0);
175284
});
176285

177286
it("buffer is null (mollifier disabled) → not_found after replica miss", async () => {

0 commit comments

Comments
 (0)