Skip to content

Commit ef54cb9

Browse files
committed
fix(webapp,run-engine): close cross-table gaps in the task_run_v2 mixed window
Routes that walk the run hierarchy through a Prisma relation only see one physical table, so during a runTableV2 flag flip (a parent and child on opposite tables) they silently miss the cross-table run. This closes the reachable cases: - cancelRun resolves child runs across both tables, so cancelling a parent cascades to a child in the other table instead of leaving it executing and holding concurrency. - updateMetadata routes metadata.parent/root operations to the scalar parent/root id, so they reach a parent in the other table instead of falling back to the child run. - a one-time-use token with no idempotency key now takes a cross-table claim for v2 orgs, so two presentations straddling a flip cannot each mint a run in a different table. - the Electric shape merge reports up-to-date only when both tables are caught up, so a multi-chunk initial snapshot no longer drops the rows that arrive after the first chunk.
1 parent c4d8c4b commit ef54cb9

8 files changed

Lines changed: 441 additions & 32 deletions

File tree

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,73 @@ export class IdempotencyKeyConcern {
215215
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days
216216

217217
if (!idempotencyKey) {
218+
// A one-time-use token with NO idempotency key would otherwise skip the
219+
// claim path below entirely. During a `runTableV2` flag flip, two
220+
// concurrent presentations of the same token can mint into DIFFERENT
221+
// physical tables (cuid -> TaskRun, ksuid -> task_run_v2); the per-table
222+
// unique constraint on `oneTimeUseToken` can't see across the two tables,
223+
// so neither INSERT raises P2002 and one token spawns two runs. For
224+
// v2-cutover orgs, serialise on the token via a Redis claim so the first
225+
// presentation wins and the rest resolve to it. Excludes
226+
// resumeParentOnCompletion (triggerAndWait) to match the buffer
227+
// fallback's handling — a one-time PUBLIC_JWT token is a fire-and-forget
228+
// public trigger, not a parent/child wait, so that case is left to the
229+
// per-table constraint.
230+
const oneTimeUseToken = request.options?.oneTimeUseToken;
231+
if (oneTimeUseToken && !request.body.options?.resumeParentOnCompletion) {
232+
const orgFeatureFlags =
233+
(request.environment.organization?.featureFlags as
234+
| Record<string, unknown>
235+
| null
236+
| undefined) ?? null;
237+
if (shouldUseV2RunTable(orgFeatureFlags)) {
238+
// Namespace the claim key so a token can never collide with a real
239+
// idempotency key in the same (envId, taskIdentifier) slot. The TTL is
240+
// a fixed pipeline-dwell bound, NOT the customer idempotencyKeyTTL:
241+
// there is no idempotency key in this path, so a client-supplied TTL
242+
// has no meaning here, and a tiny value would expire the claim
243+
// mid-flight and reopen the cross-table dup window.
244+
const claimKey = `otu:${oneTimeUseToken}`;
245+
const outcome = await claimOrAwait({
246+
envId: request.environment.id,
247+
taskIdentifier: request.taskId,
248+
idempotencyKey: claimKey,
249+
ttlSeconds: env.TRIGGER_MOLLIFIER_CLAIM_TTL_SECONDS,
250+
safetyNetMs: env.TRIGGER_MOLLIFIER_CLAIM_WAIT_MS,
251+
pollStepMs: env.TRIGGER_MOLLIFIER_CLAIM_POLL_MS,
252+
});
253+
if (outcome.kind === "resolved") {
254+
// A concurrent presentation of the same one-time token already won
255+
// and committed a run. Reject this one exactly as the within-table
256+
// path does (the per-table oneTimeUseToken unique constraint raises
257+
// P2002 -> RunOneTimeUseTokenError -> this same 4xx), preserving the
258+
// "token already used" contract while closing the cross-table gap.
259+
throw new ServiceValidationError(
260+
`Cannot trigger ${request.taskId} with a one-time use token as it has already been used.`
261+
);
262+
} else if (outcome.kind === "timed_out") {
263+
throw new ServiceValidationError(
264+
"One-time-use token claim resolution timed out",
265+
503
266+
);
267+
} else if (outcome.kind === "claimed") {
268+
// We own the claim. The trigger pipeline MUST publish (on success)
269+
// or release (on error) it — wired through the returned `claim`,
270+
// exactly like the idempotency-keyed path.
271+
return {
272+
isCached: false,
273+
idempotencyKey,
274+
idempotencyKeyExpiresAt,
275+
claim: {
276+
envId: request.environment.id,
277+
taskIdentifier: request.taskId,
278+
idempotencyKey: claimKey,
279+
token: outcome.token,
280+
},
281+
};
282+
}
283+
}
284+
}
218285
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
219286
}
220287

@@ -329,17 +396,22 @@ export class IdempotencyKeyConcern {
329396
| Record<string, unknown>
330397
| null
331398
| undefined) ?? null;
332-
// v2-cutover orgs: ANY idempotency-keyed trigger can straddle a
333-
// `runTableV2` flag flip into different physical tables (cuid -> TaskRun,
334-
// ksuid -> task_run_v2), so the claim must serialise all of them —
335-
// including triggerAndWait (resumeParentOnCompletion), debounce, and
336-
// oneTimeUseToken, whose per-table unique constraints (idempotencyKey,
337-
// oneTimeUseToken) can't see across the two tables. The
399+
// v2-cutover orgs: an idempotency-keyed trigger can straddle a `runTableV2`
400+
// flag flip into different physical tables (cuid -> TaskRun, ksuid ->
401+
// task_run_v2), and the per-table idempotency-key unique constraints can't
402+
// see across the two tables, so this claim (keyed on the idempotency key)
403+
// is the only backstop that serialises same-key triggers across the flip,
404+
// including triggerAndWait (resumeParentOnCompletion) and debounce. The
338405
// resumeParentOnCompletion/debounce/oneTimeUseToken exclusions below are
339406
// mollifier-gate alignment optimisations (those requests always return
340-
// pass_through from the gate, so there's no buffer to serialise against)
341-
// and don't apply to the cross-table concern. shouldUseV2RunTable is
342-
// checked first so a v2 org skips the mollifier-flag resolve entirely.
407+
// pass_through from the gate, so there's no buffer to serialise against);
408+
// they don't apply to v2 orgs, which short-circuit to claimEligible via
409+
// shouldUseV2RunTable regardless. oneTimeUseToken triggers with NO
410+
// idempotency key are serialised separately by the token claim in the
411+
// early-return block above; the residual same-token-with-two-different-keys
412+
// case is not covered here (each key claims its own slot) and would require
413+
// a pathological client. shouldUseV2RunTable is checked first so a v2 org
414+
// skips the mollifier-flag resolve entirely.
343415
const claimEligible =
344416
shouldUseV2RunTable(orgFeatureFlags) ||
345417
(!request.body.options?.resumeParentOnCompletion &&

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -354,18 +354,14 @@ export class UpdateMetadataService {
354354
metadata: true,
355355
metadataType: true,
356356
metadataVersion: true,
357-
parentTaskRun: {
358-
select: {
359-
id: true,
360-
status: true,
361-
},
362-
},
363-
rootTaskRun: {
364-
select: {
365-
id: true,
366-
status: true,
367-
},
368-
},
357+
// Scalar parent/root pointers, NOT the parentTaskRun/rootTaskRun
358+
// relations: a relation select is bound to one physical run table and
359+
// resolves to null when the parent/root lives in the other table (a
360+
// v2 child of a legacy parent in the mixed window). The scalar id is
361+
// table-agnostic, and #ingestRunOperations only needs the id — the
362+
// flusher routes by id format across both tables.
363+
parentTaskRunId: true,
364+
rootTaskRunId: true,
369365
},
370366
},
371367
this._prisma
@@ -380,11 +376,11 @@ export class UpdateMetadataService {
380376
}
381377

382378
if (body.parentOperations && body.parentOperations.length > 0) {
383-
this.#ingestRunOperations(taskRun.parentTaskRun?.id ?? taskRun.id, body.parentOperations);
379+
this.#ingestRunOperations(taskRun.parentTaskRunId ?? taskRun.id, body.parentOperations);
384380
}
385381

386382
if (body.rootOperations && body.rootOperations.length > 0) {
387-
this.#ingestRunOperations(taskRun.rootTaskRun?.id ?? taskRun.id, body.rootOperations);
383+
this.#ingestRunOperations(taskRun.rootTaskRunId ?? taskRun.id, body.rootOperations);
388384
}
389385

390386
const result = await this.#updateRunMetadata({

apps/webapp/app/services/realtime/electricShapeMerge.server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ export type MergedShape =
5151
offset: string;
5252
cursor?: string;
5353
schema?: string;
54+
/**
55+
* The composite is up-to-date only when BOTH shapes are. An Electric
56+
* snapshot can span multiple chunks: every chunk but the last omits the
57+
* `up-to-date` control message. If one table's snapshot is still mid-fetch
58+
* (chunk 1 of N) while the other has completed, the merged response must
59+
* NOT terminate with `up-to-date` — otherwise the client believes the
60+
* whole snapshot is done, flips to live, and never fetches the remaining
61+
* chunks (silently dropping that table's overflow rows).
62+
*/
63+
upToDate: boolean;
5464
};
5565

5666
/**
@@ -146,6 +156,12 @@ export function mergeParsedShapes(
146156
offset: encodeComposite(a.offset ?? prior.offsetA, b.offset ?? prior.offsetB),
147157
cursor,
148158
schema: a.schema ?? b.schema,
159+
// Only terminate the composite when BOTH shapes have caught up; an
160+
// un-up-to-date shape (a snapshot chunk that isn't the last) keeps the
161+
// client requesting the remainder. unpolledShape() reports upToDate:true,
162+
// so a live round that returns changes from one shape and carries the
163+
// other forward still terminates iff the polled shape is itself up-to-date.
164+
upToDate: a.upToDate && b.upToDate,
149165
};
150166
}
151167

apps/webapp/app/services/realtimeClient.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,13 @@ export class RealtimeClient {
573573
responseHeaders.set("electric-schema", merged.schema);
574574
}
575575

576-
const body = JSON.stringify([...merged.changes, UP_TO_DATE_MESSAGE]);
576+
// Only append the up-to-date terminator when BOTH upstream shapes are
577+
// caught up. If one table's snapshot is still spanning chunks, omitting the
578+
// terminator keeps the client in snapshot mode fetching the rest instead of
579+
// prematurely flipping to live and dropping that table's remaining rows.
580+
const body = JSON.stringify(
581+
merged.upToDate ? [...merged.changes, UP_TO_DATE_MESSAGE] : [...merged.changes]
582+
);
577583
const finalBody =
578584
apiVersion === CURRENT_API_VERSION ? body : this.#rewriteResponseBodyForNoneApiVersion(body);
579585
return new Response(finalBody, { status: 200, headers: responseHeaders });

apps/webapp/test/electricShapeMerge.test.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,53 @@ describe("mergeParsedShapes", () => {
198198
if (merged.mustRefetch) throw new Error("unexpected refetch");
199199
expect(merged.schema).toBe('{"id":{"type":"text"}}');
200200
});
201+
202+
it("is up-to-date only when BOTH shapes are caught up (multi-chunk snapshot guard)", () => {
203+
// Both caught up -> the composite terminates with up-to-date.
204+
const both = mergeParsedShapes(shape({ upToDate: true }), shape({ upToDate: true }), PRIOR);
205+
if (both.mustRefetch) throw new Error("unexpected refetch");
206+
expect(both.upToDate).toBe(true);
207+
208+
// Table A is mid-snapshot (chunk 1 of N: rows but no up-to-date control
209+
// message); B has completed. The composite must NOT be up-to-date — else
210+
// the client flips to live after chunk 1 and silently drops A's remaining
211+
// rows. The rows seen so far still flow through.
212+
const aMidSnapshot = mergeParsedShapes(
213+
shape({ changes: [INSERT], upToDate: false, handle: "hA", offset: "oA" }),
214+
shape({ upToDate: true, handle: "hB", offset: "oB" }),
215+
PRIOR
216+
);
217+
if (aMidSnapshot.mustRefetch) throw new Error("unexpected refetch");
218+
expect(aMidSnapshot.upToDate).toBe(false);
219+
expect(aMidSnapshot.changes).toEqual([INSERT]);
220+
221+
// Symmetric: B mid-snapshot.
222+
const bMidSnapshot = mergeParsedShapes(
223+
shape({ upToDate: true }),
224+
shape({ changes: [UPDATE], upToDate: false }),
225+
PRIOR
226+
);
227+
if (bMidSnapshot.mustRefetch) throw new Error("unexpected refetch");
228+
expect(bMidSnapshot.upToDate).toBe(false);
229+
});
230+
231+
it("a live round carrying the un-polled sibling terminates only when the polled shape is caught up", () => {
232+
// unpolledShape reports upToDate:true, so the composite terminates iff the
233+
// polled shape is itself caught up.
234+
const caughtUp = mergeParsedShapes(
235+
shape({ changes: [INSERT], upToDate: true }),
236+
unpolledShape("b", PRIOR),
237+
PRIOR
238+
);
239+
if (caughtUp.mustRefetch) throw new Error("unexpected refetch");
240+
expect(caughtUp.upToDate).toBe(true);
241+
242+
const moreComing = mergeParsedShapes(
243+
shape({ changes: [INSERT], upToDate: false }),
244+
unpolledShape("b", PRIOR),
245+
PRIOR
246+
);
247+
if (moreComing.mustRefetch) throw new Error("unexpected refetch");
248+
expect(moreComing.upToDate).toBe(false);
249+
});
201250
});
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
// Stub `~/db.server` before importing the concern — the real module eagerly
4+
// calls `prisma.$connect()` at singleton construction. The concern under test
5+
// receives its prisma via the constructor, and the one-time-token path below
6+
// reaches the claim before any DB read, so the stub is never exercised.
7+
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
8+
9+
// claimOrAwait resolves its backend through getIdempotencyClaimBuffer; script
10+
// it via a hoisted handle so each test controls the claim outcome.
11+
const h = vi.hoisted(() => ({ buffer: null as unknown }));
12+
vi.mock("~/v3/mollifier/mollifierBuffer.server", () => ({
13+
getMollifierBuffer: () => h.buffer,
14+
getIdempotencyClaimBuffer: () => h.buffer,
15+
}));
16+
// The one-time-token claim runs BEFORE the mollifier-flag resolve, but the
17+
// concern still imports the gate module; stub it so loading doesn't pull in
18+
// extra feature-flag wiring.
19+
vi.mock("~/v3/mollifier/mollifierGate.server", () => ({
20+
makeResolveMollifierFlag: () => async () => false,
21+
}));
22+
23+
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
24+
import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server";
25+
import type { TriggerTaskRequest } from "~/runEngine/types";
26+
27+
function makeConcern() {
28+
return new IdempotencyKeyConcern(
29+
{
30+
taskRun: { findFirst: async () => null },
31+
taskRunV2: { findFirst: async () => null },
32+
} as never,
33+
{} as never, // engine — unused on this path
34+
{} as never // traceEventConcern — unused on this path
35+
);
36+
}
37+
38+
function makeOtuRequest(
39+
overrides: {
40+
featureFlags?: Record<string, unknown>;
41+
oneTimeUseToken?: string | undefined;
42+
resumeParentOnCompletion?: boolean;
43+
} = {}
44+
): TriggerTaskRequest {
45+
return {
46+
taskId: "my-task",
47+
environment: {
48+
id: "env_a",
49+
organizationId: "org_1",
50+
organization: { featureFlags: overrides.featureFlags ?? { runTableV2: true } },
51+
},
52+
// No idempotencyKey on purpose — this is the path the per-table
53+
// oneTimeUseToken unique constraint cannot cover across two tables.
54+
options: { oneTimeUseToken: "oneTimeUseToken" in overrides ? overrides.oneTimeUseToken : "tok-1" },
55+
body: {
56+
options: overrides.resumeParentOnCompletion ? { resumeParentOnCompletion: true } : {},
57+
},
58+
} as unknown as TriggerTaskRequest;
59+
}
60+
61+
describe("IdempotencyKeyConcern · one-time-use token cross-table claim", () => {
62+
it("v2 org: a one-time token with no idempotency key takes a claim keyed on the token", async () => {
63+
const claimIdempotency = vi.fn(async () => ({ kind: "claimed" as const }));
64+
h.buffer = {
65+
claimIdempotency,
66+
readClaim: vi.fn(async () => null),
67+
} as unknown as MollifierBuffer;
68+
69+
const result = await makeConcern().handleTriggerRequest(makeOtuRequest(), undefined);
70+
71+
expect(result.isCached).toBe(false);
72+
if (result.isCached === false) {
73+
// The trigger pipeline must publish/release this claim — keyed on the
74+
// namespaced token so it can never collide with a real idempotency key.
75+
expect(result.claim?.idempotencyKey).toBe("otu:tok-1");
76+
expect(result.claim?.envId).toBe("env_a");
77+
expect(result.claim?.taskIdentifier).toBe("my-task");
78+
}
79+
expect(claimIdempotency).toHaveBeenCalledTimes(1);
80+
expect(claimIdempotency.mock.calls[0][0]).toMatchObject({ idempotencyKey: "otu:tok-1" });
81+
});
82+
83+
it("v2 org: a concurrent winner (claim resolved) rejects the second presentation as already-used", async () => {
84+
// The winner committed a run under the token; the loser must be rejected
85+
// exactly like the within-table P2002 path, NOT allowed to mint a duplicate
86+
// into the other table.
87+
h.buffer = {
88+
claimIdempotency: vi.fn(async () => ({ kind: "resolved", runId: "run_winner" })),
89+
readClaim: vi.fn(async () => null),
90+
} as unknown as MollifierBuffer;
91+
92+
await expect(
93+
makeConcern().handleTriggerRequest(makeOtuRequest(), undefined)
94+
).rejects.toThrow(/already been used/i);
95+
});
96+
97+
it("non-v2 org: skips the token claim entirely (no Redis round-trip)", async () => {
98+
const claimIdempotency = vi.fn(async () => ({ kind: "claimed" as const }));
99+
h.buffer = {
100+
claimIdempotency,
101+
readClaim: vi.fn(async () => null),
102+
} as unknown as MollifierBuffer;
103+
104+
const result = await makeConcern().handleTriggerRequest(
105+
makeOtuRequest({ featureFlags: { mollifierEnabled: true } }),
106+
undefined
107+
);
108+
109+
expect(result.isCached).toBe(false);
110+
if (result.isCached === false) {
111+
expect(result.claim).toBeUndefined();
112+
}
113+
expect(claimIdempotency).not.toHaveBeenCalled();
114+
});
115+
116+
it("triggerAndWait one-time token: left to the per-table constraint (not claimed here)", async () => {
117+
const claimIdempotency = vi.fn(async () => ({ kind: "claimed" as const }));
118+
h.buffer = {
119+
claimIdempotency,
120+
readClaim: vi.fn(async () => null),
121+
} as unknown as MollifierBuffer;
122+
123+
const result = await makeConcern().handleTriggerRequest(
124+
makeOtuRequest({ resumeParentOnCompletion: true }),
125+
undefined
126+
);
127+
128+
expect(result.isCached).toBe(false);
129+
if (result.isCached === false) {
130+
expect(result.claim).toBeUndefined();
131+
}
132+
expect(claimIdempotency).not.toHaveBeenCalled();
133+
});
134+
135+
it("no one-time token: ordinary no-idempotency-key trigger is unaffected", async () => {
136+
const claimIdempotency = vi.fn(async () => ({ kind: "claimed" as const }));
137+
h.buffer = {
138+
claimIdempotency,
139+
readClaim: vi.fn(async () => null),
140+
} as unknown as MollifierBuffer;
141+
142+
const result = await makeConcern().handleTriggerRequest(
143+
makeOtuRequest({ oneTimeUseToken: undefined }),
144+
undefined
145+
);
146+
147+
expect(result.isCached).toBe(false);
148+
if (result.isCached === false) {
149+
expect(result.claim).toBeUndefined();
150+
}
151+
expect(claimIdempotency).not.toHaveBeenCalled();
152+
});
153+
});

0 commit comments

Comments
 (0)