Skip to content

Commit baa6f17

Browse files
d-csclaude
andcommitted
fix(webapp): thread per-claim ownership token through publish/release
The buffer's claim API now requires a caller-supplied ownership token so compare-and-act protects the slot against a stale predecessor. Wires the token end-to-end: - `claimOrAwait` generates the token (UUID) up front and reuses it across the retry path; returns it on the `claimed` outcome. - `publishClaim` and `releaseClaim` wrappers accept and forward the token to the buffer. - `ClaimedIdempotency` carries the token so the trigger pipeline can publish or release with the same token it claimed under. - `triggerTask.server.ts` threads the token into the publish call. Tests pin token round-trip: claimOrAwait → claimIdempotency, plus the publish and release pass-through. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9b4287b commit baa6f17

4 files changed

Lines changed: 120 additions & 23 deletions

File tree

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ export type ClaimedIdempotency = {
1818
envId: string;
1919
taskIdentifier: string;
2020
idempotencyKey: string;
21+
// Ownership token from `claimOrAwait`. The caller's trigger pipeline
22+
// MUST thread this into publishClaim/releaseClaim so the buffer's
23+
// compare-and-act protects the slot against a stale predecessor.
24+
token: string;
2125
};
2226

2327
export type IdempotencyKeyConcernResult =
@@ -279,7 +283,8 @@ export class IdempotencyKeyConcern {
279283
}
280284
if (outcome.kind === "claimed") {
281285
// Caller MUST publish/release. Signalled via the result's
282-
// `claim` field.
286+
// `claim` field, including the ownership token so the buffer
287+
// can compare-and-act on the slot we now own.
283288
return {
284289
isCached: false,
285290
idempotencyKey,
@@ -288,6 +293,7 @@ export class IdempotencyKeyConcern {
288293
envId: request.environment.id,
289294
taskIdentifier: request.taskId,
290295
idempotencyKey,
296+
token: outcome.token,
291297
},
292298
};
293299
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ export class RunEngineTriggerTaskService {
627627
envId: idempotencyClaim.envId,
628628
taskIdentifier: idempotencyClaim.taskIdentifier,
629629
idempotencyKey: idempotencyClaim.idempotencyKey,
630+
token: idempotencyClaim.token,
630631
runId: result.run.friendlyId,
631632
});
632633
}

apps/webapp/app/v3/mollifier/idempotencyClaim.server.ts

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { randomUUID } from "node:crypto";
12
import type {
23
IdempotencyClaimResult,
34
IdempotencyLookupInput,
@@ -17,7 +18,11 @@ export const DEFAULT_CLAIM_WAIT_MS = 5_000;
1718
export const DEFAULT_CLAIM_POLL_MS = 25;
1819

1920
export type ClaimOrAwaitOutcome =
20-
| { kind: "claimed" } // we own the claim; caller proceeds with the trigger pipeline
21+
// We own the claim. `token` MUST be passed to publishClaim/releaseClaim
22+
// so the buffer can compare-and-act against our ownership marker — a
23+
// late release from a previous claimant whose TTL expired cannot
24+
// erase our slot.
25+
| { kind: "claimed"; token: string }
2126
| { kind: "resolved"; runId: string } // someone else's runId; caller returns isCached:true
2227
| { kind: "timed_out" };
2328

@@ -30,6 +35,10 @@ export type ClaimOrAwaitInput = IdempotencyLookupInput & {
3035
buffer?: MollifierBuffer | null;
3136
now?: () => number;
3237
sleep?: (ms: number) => Promise<void>;
38+
// Test override for the ownership-token generator. Defaults to
39+
// `crypto.randomUUID()`. Tests pass a deterministic value so they
40+
// can assert publish/release pass-through.
41+
generateToken?: () => string;
3342
};
3443

3544
// Pre-gate Redis claim. All same-key triggers serialise through here
@@ -50,12 +59,19 @@ export type ClaimOrAwaitInput = IdempotencyLookupInput & {
5059
// IdempotencyKeyConcern PG-first lookup.
5160
export async function claimOrAwait(input: ClaimOrAwaitInput): Promise<ClaimOrAwaitOutcome> {
5261
const buffer = input.buffer === undefined ? getMollifierBuffer() : input.buffer;
62+
const generateToken = input.generateToken ?? randomUUID;
63+
// Generate the ownership token up front so the retry loop reuses it
64+
// — we're the same logical claimant across attempts; only the slot
65+
// owner changes between releases.
66+
const token = generateToken();
5367
if (!buffer) {
5468
// Mollifier disabled / buffer construction failed. Fall open —
5569
// caller proceeds with the trigger pipeline (PG unique constraint
5670
// backstop). Without the claim machinery the race-window scenarios
57-
// from the plan doc revert to today's behaviour.
58-
return { kind: "claimed" };
71+
// from the plan doc revert to today's behaviour. Token is still
72+
// generated so callers don't have to branch on the "no buffer" case
73+
// — publish/release become buffer-null no-ops downstream.
74+
return { kind: "claimed", token };
5975
}
6076
const ttlSeconds = input.ttlSeconds ?? DEFAULT_CLAIM_TTL_SECONDS;
6177
const safetyNetMs = input.safetyNetMs ?? DEFAULT_CLAIM_WAIT_MS;
@@ -74,17 +90,17 @@ export async function claimOrAwait(input: ClaimOrAwaitInput): Promise<ClaimOrAwa
7490
// a prior burst).
7591
let result: IdempotencyClaimResult;
7692
try {
77-
result = await buffer.claimIdempotency({ ...lookupInput, ttlSeconds });
93+
result = await buffer.claimIdempotency({ ...lookupInput, token, ttlSeconds });
7894
} catch (err) {
7995
logger.warn("idempotency claim failed (fail-open)", {
8096
envId: input.envId,
8197
taskIdentifier: input.taskIdentifier,
8298
err: err instanceof Error ? err.message : String(err),
8399
});
84-
return { kind: "claimed" };
100+
return { kind: "claimed", token };
85101
}
86102

87-
if (result.kind === "claimed") return { kind: "claimed" };
103+
if (result.kind === "claimed") return { kind: "claimed", token };
88104
if (result.kind === "resolved") return result;
89105

90106
// result.kind === "pending" — wait/poll loop. May see the value flip
@@ -108,17 +124,19 @@ export async function claimOrAwait(input: ClaimOrAwaitInput): Promise<ClaimOrAwa
108124

109125
if (current === null) {
110126
// Claimant released on error. Re-attempt the claim — one of the
111-
// waiters will win, the rest see "pending" again.
127+
// waiters will win, the rest see "pending" again. Reuse our token:
128+
// we're still the same logical claimant, just contending for a
129+
// freshly empty slot.
112130
try {
113-
const retry = await buffer.claimIdempotency({ ...lookupInput, ttlSeconds });
114-
if (retry.kind === "claimed") return { kind: "claimed" };
131+
const retry = await buffer.claimIdempotency({ ...lookupInput, token, ttlSeconds });
132+
if (retry.kind === "claimed") return { kind: "claimed", token };
115133
if (retry.kind === "resolved") return retry;
116134
// "pending" again → keep polling.
117135
} catch (err) {
118136
logger.warn("idempotency claim retry failed", {
119137
err: err instanceof Error ? err.message : String(err),
120138
});
121-
return { kind: "claimed" };
139+
return { kind: "claimed", token };
122140
}
123141
continue;
124142
}
@@ -136,6 +154,10 @@ export async function publishClaim(input: {
136154
envId: string;
137155
taskIdentifier: string;
138156
idempotencyKey: string;
157+
// Ownership token from the `claimed` outcome. Buffer compare-and-sets
158+
// on this so a publish from a stale claimant (TTL expired, another
159+
// claimant moved in) is a no-op rather than overwriting their claim.
160+
token: string;
139161
runId: string;
140162
ttlSeconds?: number;
141163
buffer?: MollifierBuffer | null;
@@ -148,6 +170,7 @@ export async function publishClaim(input: {
148170
envId: input.envId,
149171
taskIdentifier: input.taskIdentifier,
150172
idempotencyKey: input.idempotencyKey,
173+
token: input.token,
151174
runId: input.runId,
152175
ttlSeconds,
153176
});
@@ -166,6 +189,10 @@ export async function releaseClaim(input: {
166189
envId: string;
167190
taskIdentifier: string;
168191
idempotencyKey: string;
192+
// Ownership token from the `claimed` outcome. Buffer compare-and-
193+
// deletes on this so a release from a stale claimant whose TTL
194+
// expired can't wipe a new owner's claim.
195+
token: string;
169196
buffer?: MollifierBuffer | null;
170197
}): Promise<void> {
171198
const buffer = input.buffer === undefined ? getMollifierBuffer() : input.buffer;
@@ -175,6 +202,7 @@ export async function releaseClaim(input: {
175202
envId: input.envId,
176203
taskIdentifier: input.taskIdentifier,
177204
idempotencyKey: input.idempotencyKey,
205+
token: input.token,
178206
});
179207
} catch (err) {
180208
logger.warn("idempotency claim release failed", {

apps/webapp/test/mollifierIdempotencyClaim.test.ts

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,12 @@ const baseInput = {
6060
describe("claimOrAwait", () => {
6161
it("returns 'claimed' for the first caller — empty key wins SETNX", async () => {
6262
const { buffer } = makeBuffer({ value: null });
63-
const outcome = await claimOrAwait({ ...baseInput, buffer });
64-
expect(outcome).toEqual({ kind: "claimed" });
63+
const outcome = await claimOrAwait({
64+
...baseInput,
65+
buffer,
66+
generateToken: () => "token-1",
67+
});
68+
expect(outcome).toEqual({ kind: "claimed", token: "token-1" });
6569
});
6670

6771
it("returns 'resolved' immediately when the key already holds a runId", async () => {
@@ -117,6 +121,7 @@ describe("claimOrAwait", () => {
117121
const outcome = await claimOrAwait({
118122
...baseInput,
119123
buffer,
124+
generateToken: () => "token-retry",
120125
now: () => nowValue,
121126
sleep: async (ms) => {
122127
nowValue += ms;
@@ -127,12 +132,16 @@ describe("claimOrAwait", () => {
127132
safetyNetMs: 1000,
128133
pollStepMs: 25,
129134
});
130-
expect(outcome).toEqual({ kind: "claimed" });
135+
expect(outcome).toEqual({ kind: "claimed", token: "token-retry" });
131136
});
132137

133138
it("fails open with 'claimed' when buffer is null (mollifier disabled)", async () => {
134-
const outcome = await claimOrAwait({ ...baseInput, buffer: null });
135-
expect(outcome).toEqual({ kind: "claimed" });
139+
const outcome = await claimOrAwait({
140+
...baseInput,
141+
buffer: null,
142+
generateToken: () => "token-fallopen-null",
143+
});
144+
expect(outcome).toEqual({ kind: "claimed", token: "token-fallopen-null" });
136145
});
137146

138147
it("fails open with 'claimed' if buffer.claimIdempotency throws (Redis down)", async () => {
@@ -141,8 +150,12 @@ describe("claimOrAwait", () => {
141150
throw new Error("ECONNREFUSED");
142151
}),
143152
} as unknown as MollifierBuffer;
144-
const outcome = await claimOrAwait({ ...baseInput, buffer });
145-
expect(outcome).toEqual({ kind: "claimed" });
153+
const outcome = await claimOrAwait({
154+
...baseInput,
155+
buffer,
156+
generateToken: () => "token-fallopen-throw",
157+
});
158+
expect(outcome).toEqual({ kind: "claimed", token: "token-fallopen-throw" });
146159
});
147160

148161
it("respects an aborted signal during the wait loop", async () => {
@@ -170,14 +183,14 @@ describe("claimOrAwait", () => {
170183
describe("publishClaim", () => {
171184
it("writes the runId to the claim key", async () => {
172185
const { buffer, state } = makeBuffer({ value: "pending" });
173-
await publishClaim({ ...baseInput, runId: "run_X", buffer });
186+
await publishClaim({ ...baseInput, token: "owner-token", runId: "run_X", buffer });
174187
expect(state.value).toBe("run_X");
175188
expect(buffer.publishClaim).toHaveBeenCalledOnce();
176189
});
177190

178191
it("no-op when buffer is null", async () => {
179192
await expect(
180-
publishClaim({ ...baseInput, runId: "run_X", buffer: null }),
193+
publishClaim({ ...baseInput, token: "owner-token", runId: "run_X", buffer: null }),
181194
).resolves.toBeUndefined();
182195
});
183196

@@ -188,19 +201,68 @@ describe("publishClaim", () => {
188201
}),
189202
} as unknown as MollifierBuffer;
190203
await expect(
191-
publishClaim({ ...baseInput, runId: "run_X", buffer }),
204+
publishClaim({ ...baseInput, token: "owner-token", runId: "run_X", buffer }),
192205
).resolves.toBeUndefined();
193206
});
194207
});
195208

196209
describe("releaseClaim", () => {
197210
it("DELs the claim so waiters can re-acquire", async () => {
198211
const { buffer, state } = makeBuffer({ value: "pending" });
199-
await releaseClaim({ ...baseInput, buffer });
212+
await releaseClaim({ ...baseInput, token: "owner-token", buffer });
200213
expect(state.value).toBeNull();
201214
});
202215

203216
it("no-op when buffer is null", async () => {
204-
await expect(releaseClaim({ ...baseInput, buffer: null })).resolves.toBeUndefined();
217+
await expect(releaseClaim({ ...baseInput, token: "owner-token", buffer: null })).resolves.toBeUndefined();
218+
});
219+
});
220+
221+
// End-to-end: the token from `claimOrAwait`'s `claimed` outcome must
222+
// reach `buffer.claimIdempotency` and round-trip through publishClaim /
223+
// releaseClaim. Without this the compare-and-act ownership protection
224+
// in the buffer is bypassed and the stale-claimant hazard returns.
225+
describe("claim ownership token wiring", () => {
226+
it("threads the token from claimOrAwait into buffer.claimIdempotency", async () => {
227+
const { buffer } = makeBuffer({ value: null });
228+
const outcome = await claimOrAwait({
229+
...baseInput,
230+
buffer,
231+
generateToken: () => "owner-token-xyz",
232+
});
233+
expect(outcome).toEqual({ kind: "claimed", token: "owner-token-xyz" });
234+
expect(buffer.claimIdempotency).toHaveBeenCalledWith({
235+
...baseInput,
236+
token: "owner-token-xyz",
237+
ttlSeconds: 30,
238+
});
239+
});
240+
241+
it("threads the token from publishClaim into buffer.publishClaim", async () => {
242+
const { buffer } = makeBuffer({ value: "pending" });
243+
await publishClaim({
244+
...baseInput,
245+
token: "owner-token-xyz",
246+
runId: "run_X",
247+
buffer,
248+
});
249+
expect(buffer.publishClaim).toHaveBeenCalledWith(
250+
expect.objectContaining({
251+
token: "owner-token-xyz",
252+
runId: "run_X",
253+
}),
254+
);
255+
});
256+
257+
it("threads the token from releaseClaim into buffer.releaseClaim", async () => {
258+
const { buffer } = makeBuffer({ value: "pending" });
259+
await releaseClaim({
260+
...baseInput,
261+
token: "owner-token-xyz",
262+
buffer,
263+
});
264+
expect(buffer.releaseClaim).toHaveBeenCalledWith(
265+
expect.objectContaining({ token: "owner-token-xyz" }),
266+
);
205267
});
206268
});

0 commit comments

Comments
 (0)