Skip to content

Commit 9246ca1

Browse files
d-csclaude
andcommitted
feat(webapp): mollifier trigger-time decisions — mollify, claim, read fallback
The trigger hot path's mollifier integration: - `mollifyTrigger`: when the gate trips, write the engine.trigger snapshot to the buffer and return a synthesised QUEUED response. - Pre-gate idempotency-key claim: same-key triggers serialise through Redis so a burst lands in PG / buffer exactly once. - Read-fallback extensions: `findRunByIdWithMollifierFallback` for the trigger-time idempotency lookup that must see buffered runs. - Gate bypasses: debounce, oneTimeUseToken, parentTaskRun (triggerAndWait) skip the mollify path entirely. - triggerTask + IdempotencyKeyConcern wired to the above. Stacked on buffer extensions PR. All behaviour gated by the master `TRIGGER_MOLLIFIER_ENABLED` switch; off-state hot path is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 02cfe1a commit 9246ca1

13 files changed

Lines changed: 1745 additions & 422 deletions

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ const EnvironmentSchema = z
10921092
.transform((v) => v ?? process.env.REDIS_PASSWORD),
10931093
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
10941094
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
1095-
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
1095+
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().nonnegative().default(100),
10961096
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
10971097
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
10981098
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 161 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,38 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic";
22
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
33
import { logger } from "~/services/logger.server";
44
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
5+
import { ServiceValidationError } from "~/v3/services/common.server";
56
import type { RunEngine } from "~/v3/runEngine.server";
67
import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus";
8+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
9+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
10+
import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
711
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
812

13+
// Claim ownership context returned to the caller when the
14+
// IdempotencyKeyConcern won a pre-gate claim. Caller MUST publish the
15+
// winning runId on pipeline success (`publishClaim`) or release the
16+
// claim on failure (`releaseClaim`).
17+
export type ClaimedIdempotency = {
18+
envId: string;
19+
taskIdentifier: string;
20+
idempotencyKey: string;
21+
};
22+
923
export type IdempotencyKeyConcernResult =
1024
| { isCached: true; run: TaskRun }
11-
| { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date };
25+
| {
26+
isCached: false;
27+
idempotencyKey?: string;
28+
idempotencyKeyExpiresAt?: Date;
29+
// Set when this trigger holds a pre-gate claim. The caller's
30+
// trigger pipeline MUST resolve the claim by either publishing
31+
// the runId on success or releasing on failure. Undefined when
32+
// the request has no idempotency key, when the buffer is
33+
// unavailable, or when the request is a triggerAndWait (claim
34+
// path skipped per plan doc).
35+
claim?: ClaimedIdempotency;
36+
};
1237

1338
export class IdempotencyKeyConcern {
1439
constructor(
@@ -17,6 +42,47 @@ export class IdempotencyKeyConcern {
1742
private readonly traceEventConcern: TraceEventConcern
1843
) {}
1944

45+
// Q5 buffer-side dedup. Resolves an idempotency key against the
46+
// mollifier buffer when PG missed. Returns a SyntheticRun cast to
47+
// TaskRun so the route handler (which only reads run.id / run.friendlyId)
48+
// can echo the buffered run's friendlyId as a cached hit. Returns null
49+
// for any failure or miss — buffer outages must not 500 the trigger
50+
// hot path; we fail open to "no cache hit" and let the request through.
51+
private async findBufferedRunWithIdempotency(
52+
environmentId: string,
53+
organizationId: string,
54+
taskIdentifier: string,
55+
idempotencyKey: string,
56+
): Promise<TaskRun | null> {
57+
const buffer = getMollifierBuffer();
58+
if (!buffer) return null;
59+
60+
let bufferedRunId: string | null;
61+
try {
62+
bufferedRunId = await buffer.lookupIdempotency({
63+
envId: environmentId,
64+
taskIdentifier,
65+
idempotencyKey,
66+
});
67+
} catch (err) {
68+
logger.error("IdempotencyKeyConcern: buffer lookupIdempotency failed", {
69+
environmentId,
70+
taskIdentifier,
71+
err: err instanceof Error ? err.message : String(err),
72+
});
73+
return null;
74+
}
75+
if (!bufferedRunId) return null;
76+
77+
const synthetic = await findRunByIdWithMollifierFallback({
78+
runId: bufferedRunId,
79+
environmentId,
80+
organizationId,
81+
});
82+
if (!synthetic) return null;
83+
return synthetic as unknown as TaskRun;
84+
}
85+
2086
async handleTriggerRequest(
2187
request: TriggerTaskRequest,
2288
parentStore: string | undefined
@@ -44,6 +110,25 @@ export class IdempotencyKeyConcern {
44110
})
45111
: undefined;
46112

113+
// Buffer fallback per Q5 mollifier-idempotency design. PG missed —
114+
// the same key may belong to a buffered run that hasn't materialised
115+
// yet. Skipped when `resumeParentOnCompletion` is set: blocking a
116+
// parent on a buffered child via waitpoint requires a PG row that
117+
// doesn't exist yet. The follow-up accept's SETNX in mollifyTrigger
118+
// still dedupes the trigger itself; the waitpoint just doesn't fire
119+
// for this rare race window.
120+
if (!existingRun && idempotencyKey && !request.body.options?.resumeParentOnCompletion) {
121+
const buffered = await this.findBufferedRunWithIdempotency(
122+
request.environment.id,
123+
request.environment.organizationId,
124+
request.taskId,
125+
idempotencyKey,
126+
);
127+
if (buffered) {
128+
return { isCached: true, run: buffered };
129+
}
130+
}
131+
47132
if (existingRun) {
48133
// The idempotency key has expired
49134
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {
@@ -133,6 +218,81 @@ export class IdempotencyKeyConcern {
133218
return { isCached: true, run: existingRun };
134219
}
135220

221+
// Pre-gate claim — closes the PG+buffer race during gate transition
222+
// (see _plans/2026-05-21-mollifier-idempotency-claim.md). All
223+
// same-key triggers serialise here before evaluateGate decides
224+
// PG-pass-through vs mollify. Skipped for triggerAndWait
225+
// (resumeParentOnCompletion) — that path bypasses the gate via F4
226+
// and its existing PG-side dedup is sufficient.
227+
if (!request.body.options?.resumeParentOnCompletion) {
228+
const ttlSeconds = Math.max(
229+
1,
230+
Math.min(
231+
30,
232+
Math.ceil((idempotencyKeyExpiresAt.getTime() - Date.now()) / 1000),
233+
),
234+
);
235+
const outcome = await claimOrAwait({
236+
envId: request.environment.id,
237+
taskIdentifier: request.taskId,
238+
idempotencyKey,
239+
ttlSeconds,
240+
});
241+
if (outcome.kind === "resolved") {
242+
// Another concurrent trigger committed first. Re-resolve via the
243+
// existing checks: writer-side PG findFirst first (defeats
244+
// replica lag), then buffer fallback for the buffered case.
245+
const writerRun = await this.prisma.taskRun.findFirst({
246+
where: {
247+
runtimeEnvironmentId: request.environment.id,
248+
idempotencyKey,
249+
taskIdentifier: request.taskId,
250+
},
251+
include: { associatedWaitpoint: true },
252+
});
253+
if (writerRun) {
254+
return { isCached: true, run: writerRun };
255+
}
256+
const buffered = await this.findBufferedRunWithIdempotency(
257+
request.environment.id,
258+
request.environment.organizationId,
259+
request.taskId,
260+
idempotencyKey,
261+
);
262+
if (buffered) {
263+
return { isCached: true, run: buffered };
264+
}
265+
// Claim resolved to a runId nothing can find — likely the
266+
// claimant errored after publish, or the row TTL'd out. Log
267+
// and fall through to a fresh trigger.
268+
logger.warn("idempotency claim resolved but runId not findable", {
269+
envId: request.environment.id,
270+
taskIdentifier: request.taskId,
271+
claimedRunId: outcome.runId,
272+
});
273+
}
274+
if (outcome.kind === "timed_out") {
275+
throw new ServiceValidationError(
276+
"Idempotency claim resolution timed out",
277+
503,
278+
);
279+
}
280+
if (outcome.kind === "claimed") {
281+
// Caller MUST publish/release. Signalled via the result's
282+
// `claim` field.
283+
return {
284+
isCached: false,
285+
idempotencyKey,
286+
idempotencyKeyExpiresAt,
287+
claim: {
288+
envId: request.environment.id,
289+
taskIdentifier: request.taskId,
290+
idempotencyKey,
291+
},
292+
};
293+
}
294+
}
295+
136296
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
137297
}
138298
}

0 commit comments

Comments
 (0)