Skip to content

Commit 0f7365d

Browse files
d-csclaude
andcommitted
fix(webapp,redis-worker): enforce MAX_TAGS on buffered runs + surface metadata fallback errors
The tags API skipped MAX_TAGS_PER_RUN enforcement on the buffered path, letting a buffered run exceed the cap the trigger validator applies at creation. Enforce it atomically in the mutateSnapshot Lua: append_tags now accepts an optional maxTags and returns "limit_exceeded" (writing nothing) when the deduped count would overflow. mutateWithFallback gains a symmetric rejectedResponse builder + a "rejected" outcome; the tags route returns 422, matching the PG path. Also stop silently swallowing PG failures in the metadata route's parent/root op fan-out: warn (with targetRunId + error) before the best-effort buffer fallback so a genuine PG outage is observable. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1395925 commit 0f7365d

7 files changed

Lines changed: 155 additions & 14 deletions

File tree

.changeset/mollifier-tag-cap.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Mollifier `mutateSnapshot` now enforces a tag cap: an `append_tags` patch carrying `maxTags` returns `"limit_exceeded"` (writing nothing) when the deduped tag count would exceed the limit, so a buffered run can't accumulate more tags via the tags API than the trigger validator allows at creation.

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { z } from "zod";
77
import { $replica } from "~/db.server";
88
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
99
import { authenticateApiRequest } from "~/services/apiAuth.server";
10+
import { logger } from "~/services/logger.server";
1011
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
1112
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1213
import { ServiceValidationError } from "~/v3/services/common.server";
@@ -84,10 +85,17 @@ async function routeOperationsToRun(
8485
);
8586
if (!error) return;
8687

87-
// PG service threw — could be "Cannot update metadata for a completed
88-
// run" or similar. If the target is buffered, route operations to its
89-
// snapshot too. Best-effort; do not surface this failure to the
90-
// caller — the parent/root ops are auxiliary.
88+
// PG service threw — commonly "Cannot update metadata for a completed
89+
// run", but it could also be a transient PG failure. The parent/root
90+
// ops are auxiliary, so we stay best-effort and don't surface this to
91+
// the caller — but we must not swallow the failure silently, otherwise
92+
// a genuine PG outage on these ops is invisible. Warn, then try the
93+
// buffer in case the target is itself buffered.
94+
logger.warn("metadata route: parent/root PG op failed, falling back to buffer", {
95+
targetRunId,
96+
error: error instanceof Error ? error.message : String(error),
97+
});
98+
9199
await applyMetadataMutationToBufferedRun({
92100
runId: targetRunId,
93101
body: { operations },

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ export async function action({ request, params }: ActionFunctionArgs) {
4444
}
4545

4646
const env = authenticationResult.environment;
47-
const outcome = await mutateWithFallback({
47+
const outcome = await mutateWithFallback<Response>({
4848
runId: parsedParams.data.runId,
4949
environmentId: env.id,
5050
organizationId: env.organizationId,
51-
bufferPatch: { type: "append_tags", tags: nonEmptyTags },
51+
bufferPatch: { type: "append_tags", tags: nonEmptyTags, maxTags: MAX_TAGS_PER_RUN },
5252
pgMutation: async (taskRun) => {
5353
const existing = taskRun.runTags ?? [];
5454
const newTags = nonEmptyTags.filter((t) => !existing.includes(t));
@@ -76,13 +76,20 @@ export async function action({ request, params }: ActionFunctionArgs) {
7676
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
7777
},
7878
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates
79-
// against existing snapshot tags atomically. MAX_TAGS_PER_RUN
80-
// enforcement is skipped on the buffered side — the drainer's
81-
// engine.trigger writes the PG row without enforcement either,
82-
// matching today's pre-buffer trigger semantics. A future
83-
// refinement could push the limit check into the Lua.
79+
// against existing snapshot tags atomically and enforces
80+
// MAX_TAGS_PER_RUN via the `maxTags` we pass in `bufferPatch` —
81+
// matching the PG-path cap above so a buffered run can't exceed the
82+
// limit the trigger validator applies at creation.
8483
synthesisedResponse: () =>
8584
json({ message: `Successfully set ${nonEmptyTags.length} new tags.` }, { status: 200 }),
85+
// Buffer rejected the append because it would exceed the cap. We
86+
// don't know the exact deduped overflow count here (the Lua does),
87+
// so report the limit rather than a precise "trying to set N".
88+
rejectedResponse: () =>
89+
json(
90+
{ error: `Runs can only have ${MAX_TAGS_PER_RUN} tags.` },
91+
{ status: 422 }
92+
),
8693
abortSignal: getRequestAbortSignal(),
8794
});
8895

apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ export type MutateWithFallbackInput<TResponse> = {
2828
// Called when the patch landed cleanly on the buffer snapshot. The
2929
// drainer will see the patched payload on its next pop.
3030
synthesisedResponse: () => TResponse | Promise<TResponse>;
31+
// Called when the buffer rejected the patch as invalid (e.g. an
32+
// `append_tags` patch carrying `maxTags` would exceed the cap). Required
33+
// only by callers that send a rejectable patch; the helper throws if the
34+
// buffer reports a rejection and no builder was supplied.
35+
rejectedResponse?: () => TResponse | Promise<TResponse>;
3136
abortSignal?: AbortSignal;
3237
// Override defaults for tests.
3338
safetyNetMs?: number;
@@ -47,6 +52,7 @@ export type MutateWithFallbackInput<TResponse> = {
4752
export type MutateWithFallbackOutcome<TResponse> =
4853
| { kind: "pg"; response: TResponse }
4954
| { kind: "snapshot"; response: TResponse }
55+
| { kind: "rejected"; response: TResponse }
5056
| { kind: "not_found" }
5157
| { kind: "timed_out" };
5258

@@ -86,6 +92,18 @@ export async function mutateWithFallback<TResponse>(
8692
return { kind: "snapshot", response: await input.synthesisedResponse() };
8793
}
8894

95+
if (result === "limit_exceeded") {
96+
// The buffer refused the patch (e.g. tag cap). Nothing was written.
97+
// Surface the caller's rejection body; a missing builder means the
98+
// caller sent a rejectable patch without handling the rejection.
99+
if (!input.rejectedResponse) {
100+
throw new Error(
101+
"mutateWithFallback: buffer returned 'limit_exceeded' but no rejectedResponse was provided",
102+
);
103+
}
104+
return { kind: "rejected", response: await input.rejectedResponse() };
105+
}
106+
89107
if (result === "not_found") {
90108
// Disambiguate a genuine 404 from a replica-lag miss: ask the writer
91109
// directly. If the row just appeared post-drain we route through the

apps/webapp/test/mollifierMutateWithFallback.test.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,36 @@ describe("mutateWithFallback", () => {
283283
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(0);
284284
});
285285

286+
it("replica miss + buffer limit_exceeded → rejected via rejectedResponse builder", async () => {
287+
const pgMutation = vi.fn(async () => "pg");
288+
const synthesisedResponse = vi.fn(() => "snap");
289+
const result = await mutateWithFallback({
290+
...baseInput,
291+
pgMutation,
292+
synthesisedResponse,
293+
rejectedResponse: () => "too-many-tags",
294+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
295+
prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma,
296+
getBuffer: () => bufferReturning("limit_exceeded"),
297+
});
298+
expect(result).toEqual({ kind: "rejected", response: "too-many-tags" });
299+
expect(pgMutation).not.toHaveBeenCalled();
300+
expect(synthesisedResponse).not.toHaveBeenCalled();
301+
});
302+
303+
it("buffer limit_exceeded without a rejectedResponse builder → throws (programmer error)", async () => {
304+
await expect(
305+
mutateWithFallback({
306+
...baseInput,
307+
pgMutation: async () => "pg",
308+
synthesisedResponse: () => "snap",
309+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
310+
prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma,
311+
getBuffer: () => bufferReturning("limit_exceeded"),
312+
})
313+
).rejects.toThrow(/limit_exceeded/);
314+
});
315+
286316
it("buffer is null (mollifier disabled) → not_found after replica miss", async () => {
287317
const result = await mutateWithFallback({
288318
...baseInput,

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1937,6 +1937,61 @@ describe("MollifierBuffer.mutateSnapshot", () => {
19371937
},
19381938
);
19391939

1940+
redisTest(
1941+
"append_tags rejects with limit_exceeded when maxTags would be exceeded, writing nothing",
1942+
{ timeout: 20_000 },
1943+
async ({ redisContainer }) => {
1944+
const buffer = new MollifierBuffer({
1945+
redisOptions: {
1946+
host: redisContainer.getHost(),
1947+
port: redisContainer.getPort(),
1948+
password: redisContainer.getPassword(),
1949+
},
1950+
logger: new Logger("test", "log"),
1951+
});
1952+
try {
1953+
await buffer.accept({
1954+
runId: "r_cap",
1955+
envId: "env_m",
1956+
orgId: "org_1",
1957+
payload: serialiseSnapshot({ tags: ["a", "b"] }),
1958+
});
1959+
1960+
// 2 existing + 2 new = 4 deduped > cap of 3 → rejected, nothing written.
1961+
const rejected = await buffer.mutateSnapshot("r_cap", {
1962+
type: "append_tags",
1963+
tags: ["c", "d"],
1964+
maxTags: 3,
1965+
});
1966+
expect(rejected).toBe("limit_exceeded");
1967+
const afterReject = await buffer.getEntry("r_cap");
1968+
const rejPayload = JSON.parse(afterReject!.payload) as { tags: string[] };
1969+
expect(rejPayload.tags).toEqual(["a", "b"]);
1970+
1971+
// Dedup keeps the count under the cap → applied.
1972+
const applied = await buffer.mutateSnapshot("r_cap", {
1973+
type: "append_tags",
1974+
tags: ["a", "c"],
1975+
maxTags: 3,
1976+
});
1977+
expect(applied).toBe("applied_to_snapshot");
1978+
const afterApply = await buffer.getEntry("r_cap");
1979+
const appPayload = JSON.parse(afterApply!.payload) as { tags: string[] };
1980+
expect(appPayload.tags).toEqual(["a", "b", "c"]);
1981+
1982+
// Landing exactly on the cap is allowed.
1983+
const exact = await buffer.mutateSnapshot("r_cap", {
1984+
type: "append_tags",
1985+
tags: ["a", "b", "c"],
1986+
maxTags: 3,
1987+
});
1988+
expect(exact).toBe("applied_to_snapshot");
1989+
} finally {
1990+
await buffer.close();
1991+
}
1992+
},
1993+
);
1994+
19401995
redisTest(
19411996
"set_metadata replaces metadata + metadataType (last-write-wins)",
19421997
{ timeout: 20_000 },

packages/redis-worker/src/mollifier/buffer.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,21 @@ export function mollifierReconnectDelayMs(
3737
}
3838

3939
export type SnapshotPatch =
40-
| { type: "append_tags"; tags: string[] }
40+
// `maxTags`, when set, caps the deduped tag count atomically inside the
41+
// Lua: if appending would push the snapshot over the limit the patch is
42+
// rejected ("limit_exceeded") and nothing is written, mirroring the
43+
// PG-path MAX_TAGS_PER_RUN check so a buffered run can't accumulate more
44+
// tags than the trigger validator would have allowed at creation.
45+
| { type: "append_tags"; tags: string[]; maxTags?: number }
4146
| { type: "set_metadata"; metadata: string; metadataType: string }
4247
| { type: "set_delay"; delayUntil: string }
4348
| { type: "mark_cancelled"; cancelledAt: string; cancelReason?: string };
4449

45-
export type MutateSnapshotResult = "applied_to_snapshot" | "not_found" | "busy";
50+
export type MutateSnapshotResult =
51+
| "applied_to_snapshot"
52+
| "not_found"
53+
| "busy"
54+
| "limit_exceeded";
4655

4756
export type CasSetMetadataResult =
4857
| { kind: "applied"; newVersion: number }
@@ -311,6 +320,8 @@ export class MollifierBuffer {
311320
// FAILED entry, whose hash the drainer-terminal `fail` path DELs.
312321
// - "busy": entry is DRAINING or materialised. The API
313322
// wait-and-bounces through PG.
323+
// - "limit_exceeded": an `append_tags` patch carrying `maxTags` would
324+
// push the deduped tag count over the cap; nothing is written.
314325
async mutateSnapshot(runId: string, patch: SnapshotPatch): Promise<MutateSnapshotResult> {
315326
const result = (await this.redis.mutateMollifierSnapshot(
316327
`mollifier:entries:${runId}`,
@@ -319,7 +330,8 @@ export class MollifierBuffer {
319330
if (
320331
result === "applied_to_snapshot" ||
321332
result === "not_found" ||
322-
result === "busy"
333+
result === "busy" ||
334+
result === "limit_exceeded"
323335
) {
324336
return result;
325337
}
@@ -914,6 +926,12 @@ export class MollifierBuffer {
914926
table.insert(merged, t)
915927
end
916928
end
929+
-- Cap the deduped count when the caller supplies a limit, so a
930+
-- buffered run can't exceed MAX_TAGS_PER_RUN via the tags API.
931+
-- Reject the whole patch (write nothing) rather than truncating.
932+
if patch.maxTags ~= nil and #merged > patch.maxTags then
933+
return 'limit_exceeded'
934+
end
917935
payload.tags = merged
918936
elseif patch.type == 'set_metadata' then
919937
payload.metadata = patch.metadata

0 commit comments

Comments
 (0)