@@ -2,13 +2,38 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic";
22import type { PrismaClientOrTransaction , TaskRun } from "@trigger.dev/database" ;
33import { logger } from "~/services/logger.server" ;
44import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server" ;
5+ import { ServiceValidationError } from "~/v3/services/common.server" ;
56import type { RunEngine } from "~/v3/runEngine.server" ;
67import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus" ;
8+ import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server" ;
9+ import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server" ;
10+ import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server" ;
711import type { TraceEventConcern , TriggerTaskRequest } from "../types" ;
812
13+ // Claim ownership context returned to the caller when the
14+ // IdempotencyKeyConcern won a pre-gate claim. Caller MUST publish the
15+ // winning runId on pipeline success (`publishClaim`) or release the
16+ // claim on failure (`releaseClaim`).
17+ export type ClaimedIdempotency = {
18+ envId : string ;
19+ taskIdentifier : string ;
20+ idempotencyKey : string ;
21+ } ;
22+
923export type IdempotencyKeyConcernResult =
1024 | { isCached : true ; run : TaskRun }
11- | { isCached : false ; idempotencyKey ?: string ; idempotencyKeyExpiresAt ?: Date } ;
25+ | {
26+ isCached : false ;
27+ idempotencyKey ?: string ;
28+ idempotencyKeyExpiresAt ?: Date ;
29+ // Set when this trigger holds a pre-gate claim. The caller's
30+ // trigger pipeline MUST resolve the claim by either publishing
31+ // the runId on success or releasing on failure. Undefined when
32+ // the request has no idempotency key, when the buffer is
33+ // unavailable, or when the request is a triggerAndWait (claim
34+ // path skipped per plan doc).
35+ claim ?: ClaimedIdempotency ;
36+ } ;
1237
1338export class IdempotencyKeyConcern {
1439 constructor (
@@ -17,6 +42,47 @@ export class IdempotencyKeyConcern {
1742 private readonly traceEventConcern : TraceEventConcern
1843 ) { }
1944
45+ // Q5 buffer-side dedup. Resolves an idempotency key against the
46+ // mollifier buffer when PG missed. Returns a SyntheticRun cast to
47+ // TaskRun so the route handler (which only reads run.id / run.friendlyId)
48+ // can echo the buffered run's friendlyId as a cached hit. Returns null
49+ // for any failure or miss — buffer outages must not 500 the trigger
50+ // hot path; we fail open to "no cache hit" and let the request through.
51+ private async findBufferedRunWithIdempotency (
52+ environmentId : string ,
53+ organizationId : string ,
54+ taskIdentifier : string ,
55+ idempotencyKey : string ,
56+ ) : Promise < TaskRun | null > {
57+ const buffer = getMollifierBuffer ( ) ;
58+ if ( ! buffer ) return null ;
59+
60+ let bufferedRunId : string | null ;
61+ try {
62+ bufferedRunId = await buffer . lookupIdempotency ( {
63+ envId : environmentId ,
64+ taskIdentifier,
65+ idempotencyKey,
66+ } ) ;
67+ } catch ( err ) {
68+ logger . error ( "IdempotencyKeyConcern: buffer lookupIdempotency failed" , {
69+ environmentId,
70+ taskIdentifier,
71+ err : err instanceof Error ? err . message : String ( err ) ,
72+ } ) ;
73+ return null ;
74+ }
75+ if ( ! bufferedRunId ) return null ;
76+
77+ const synthetic = await findRunByIdWithMollifierFallback ( {
78+ runId : bufferedRunId ,
79+ environmentId,
80+ organizationId,
81+ } ) ;
82+ if ( ! synthetic ) return null ;
83+ return synthetic as unknown as TaskRun ;
84+ }
85+
2086 async handleTriggerRequest (
2187 request : TriggerTaskRequest ,
2288 parentStore : string | undefined
@@ -44,6 +110,25 @@ export class IdempotencyKeyConcern {
44110 } )
45111 : undefined ;
46112
113+ // Buffer fallback per Q5 mollifier-idempotency design. PG missed —
114+ // the same key may belong to a buffered run that hasn't materialised
115+ // yet. Skipped when `resumeParentOnCompletion` is set: blocking a
116+ // parent on a buffered child via waitpoint requires a PG row that
117+ // doesn't exist yet. The follow-up accept's SETNX in mollifyTrigger
118+ // still dedupes the trigger itself; the waitpoint just doesn't fire
119+ // for this rare race window.
120+ if ( ! existingRun && idempotencyKey && ! request . body . options ?. resumeParentOnCompletion ) {
121+ const buffered = await this . findBufferedRunWithIdempotency (
122+ request . environment . id ,
123+ request . environment . organizationId ,
124+ request . taskId ,
125+ idempotencyKey ,
126+ ) ;
127+ if ( buffered ) {
128+ return { isCached : true , run : buffered } ;
129+ }
130+ }
131+
47132 if ( existingRun ) {
48133 // The idempotency key has expired
49134 if ( existingRun . idempotencyKeyExpiresAt && existingRun . idempotencyKeyExpiresAt < new Date ( ) ) {
@@ -133,6 +218,81 @@ export class IdempotencyKeyConcern {
133218 return { isCached : true , run : existingRun } ;
134219 }
135220
221+ // Pre-gate claim — closes the PG+buffer race during gate transition
222+ // (see _plans/2026-05-21-mollifier-idempotency-claim.md). All
223+ // same-key triggers serialise here before evaluateGate decides
224+ // PG-pass-through vs mollify. Skipped for triggerAndWait
225+ // (resumeParentOnCompletion) — that path bypasses the gate via F4
226+ // and its existing PG-side dedup is sufficient.
227+ if ( ! request . body . options ?. resumeParentOnCompletion ) {
228+ const ttlSeconds = Math . max (
229+ 1 ,
230+ Math . min (
231+ 30 ,
232+ Math . ceil ( ( idempotencyKeyExpiresAt . getTime ( ) - Date . now ( ) ) / 1000 ) ,
233+ ) ,
234+ ) ;
235+ const outcome = await claimOrAwait ( {
236+ envId : request . environment . id ,
237+ taskIdentifier : request . taskId ,
238+ idempotencyKey,
239+ ttlSeconds,
240+ } ) ;
241+ if ( outcome . kind === "resolved" ) {
242+ // Another concurrent trigger committed first. Re-resolve via the
243+ // existing checks: writer-side PG findFirst first (defeats
244+ // replica lag), then buffer fallback for the buffered case.
245+ const writerRun = await this . prisma . taskRun . findFirst ( {
246+ where : {
247+ runtimeEnvironmentId : request . environment . id ,
248+ idempotencyKey,
249+ taskIdentifier : request . taskId ,
250+ } ,
251+ include : { associatedWaitpoint : true } ,
252+ } ) ;
253+ if ( writerRun ) {
254+ return { isCached : true , run : writerRun } ;
255+ }
256+ const buffered = await this . findBufferedRunWithIdempotency (
257+ request . environment . id ,
258+ request . environment . organizationId ,
259+ request . taskId ,
260+ idempotencyKey ,
261+ ) ;
262+ if ( buffered ) {
263+ return { isCached : true , run : buffered } ;
264+ }
265+ // Claim resolved to a runId nothing can find — likely the
266+ // claimant errored after publish, or the row TTL'd out. Log
267+ // and fall through to a fresh trigger.
268+ logger . warn ( "idempotency claim resolved but runId not findable" , {
269+ envId : request . environment . id ,
270+ taskIdentifier : request . taskId ,
271+ claimedRunId : outcome . runId ,
272+ } ) ;
273+ }
274+ if ( outcome . kind === "timed_out" ) {
275+ throw new ServiceValidationError (
276+ "Idempotency claim resolution timed out" ,
277+ 503 ,
278+ ) ;
279+ }
280+ if ( outcome . kind === "claimed" ) {
281+ // Caller MUST publish/release. Signalled via the result's
282+ // `claim` field.
283+ return {
284+ isCached : false ,
285+ idempotencyKey,
286+ idempotencyKeyExpiresAt,
287+ claim : {
288+ envId : request . environment . id ,
289+ taskIdentifier : request . taskId ,
290+ idempotencyKey,
291+ } ,
292+ } ;
293+ }
294+ }
295+
136296 return { isCached : false , idempotencyKey, idempotencyKeyExpiresAt } ;
137297 }
138298}
0 commit comments