Skip to content

Commit f4b6064

Browse files
d-csclaude
andcommitted
fix(webapp): mutations-layer code-review follow-ups
- metadata route: drop the \`as unknown as Parameters<...>\` cast on the parent/root operations path. Widen \`routeOperationsToRun\`'s env parameter to \`AuthenticatedEnvironment\` so the service's typed signature carries through; the caller always has the full env in scope. - replay route: validate the buffered fallback against a Zod \`BufferedReplayInputSchema\` covering the fields \`ReplayTaskRunService.call\` actually reads (id, friendlyId, runtimeEnvironmentId, taskIdentifier, payload, payloadType, queue, isTest, traceId, spanId, engine, runTags + nullable concurrencyKey/workerQueue/machinePreset/realtimeStreamsVersion). Schema-fail logs the issue list and 404s rather than passing a half-shaped object into the service. - resetIdempotencyKey: distinguish "PG-empty + buffer-cleared-nothing" (genuine 404) from "PG-empty + buffer-unreachable" (partial outage — 503 with retry hint). The previous behaviour silently returned 404 on outage, hiding the partial failure and leaving a buffered key effectively un-reset. New regression test covers all four branches (PG-hit + buffer-throws, PG-empty + buffer-hit, PG-empty + buffer-clean-miss, PG-empty + buffer-outage, mollifier-disabled). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2ee45a8 commit f4b6064

4 files changed

Lines changed: 175 additions & 15 deletions

File tree

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { RunMetadataChangeOperation } from "@trigger.dev/core/v3/schemas";
55
import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
66
import { z } from "zod";
77
import { $replica } from "~/db.server";
8+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
89
import { authenticateApiRequest } from "~/services/apiAuth.server";
910
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
1011
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
@@ -69,20 +70,17 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
6970
async function routeOperationsToRun(
7071
targetRunId: string | undefined,
7172
operations: RunMetadataChangeOperation[] | undefined,
72-
env: { id: string; organizationId: string }
73+
env: AuthenticatedEnvironment
7374
): Promise<void> {
7475
if (!targetRunId || !operations || operations.length === 0) return;
7576

7677
// Try PG first via the existing service (this is how parent/root
77-
// operations have always landed; preserve that).
78+
// operations have always landed; preserve that). Accepts the full
79+
// AuthenticatedEnvironment so we don't have to recover the unsafe
80+
// `as unknown` cast that the previous narrowed `{ id, organizationId }`
81+
// signature forced on us.
7882
const [error] = await tryCatch(
79-
updateMetadataService.call(
80-
targetRunId,
81-
{ operations },
82-
{ id: env.id, organizationId: env.organizationId } as unknown as Parameters<
83-
typeof updateMetadataService.call
84-
>[2]
85-
)
83+
updateMetadataService.call(targetRunId, { operations }, env)
8684
);
8785
if (!error) return;
8886

apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,32 @@ const ParamsSchema = z.object({
1414
runParam: z.string(),
1515
});
1616

17+
// Subset of TaskRun fields that ReplayTaskRunService.call actually
18+
// reads from `existingTaskRun`. Validate the buffered fallback against
19+
// this before casting to TaskRun so a buffer-format drift surfaces as a
20+
// 404/422 here rather than as a silent NaN/undefined deep inside
21+
// replay. The full TaskRun type has many more fields the service never
22+
// touches; we only assert the ones it reads.
23+
const BufferedReplayInputSchema = z.object({
24+
id: z.string(),
25+
friendlyId: z.string(),
26+
runtimeEnvironmentId: z.string(),
27+
taskIdentifier: z.string(),
28+
payload: z.string(),
29+
payloadType: z.string(),
30+
queue: z.string(),
31+
isTest: z.boolean(),
32+
traceId: z.string(),
33+
spanId: z.string(),
34+
engine: z.string(),
35+
runTags: z.array(z.string()),
36+
// Nullable / optional fields the service tolerates via `??` fallbacks.
37+
concurrencyKey: z.string().nullable().optional(),
38+
workerQueue: z.string().nullable().optional(),
39+
machinePreset: z.string().nullable().optional(),
40+
realtimeStreamsVersion: z.string().nullable().optional(),
41+
});
42+
1743
export async function action({ request, params }: ActionFunctionArgs) {
1844
// Ensure this is a POST request
1945
if (request.method.toUpperCase() !== "POST") {
@@ -49,16 +75,28 @@ export async function action({ request, params }: ActionFunctionArgs) {
4975
if (!taskRun) {
5076
// Buffered fallback (Q2). The SyntheticRun shape was extended in
5177
// Phase B4 to carry every field ReplayTaskRunService reads from a
52-
// TaskRun. Cast through unknown — the synthesised object has the
53-
// same field surface as a real PG row from the service's
54-
// perspective.
78+
// TaskRun. Validate the subset of fields the service consumes
79+
// (BufferedReplayInputSchema above) before casting; a schema
80+
// mismatch surfaces as a 404 here rather than as a silent
81+
// undefined deep inside the service.
5582
const buffered = await findRunByIdWithMollifierFallback({
5683
runId: runParam,
5784
environmentId: env.id,
5885
organizationId: env.organizationId,
5986
});
6087
if (buffered) {
61-
taskRun = buffered as unknown as TaskRun;
88+
const parsed = BufferedReplayInputSchema.safeParse(buffered);
89+
if (parsed.success) {
90+
taskRun = parsed.data as unknown as TaskRun;
91+
} else {
92+
logger.warn("replay: buffered fallback failed schema validation", {
93+
runParam,
94+
issues: parsed.error.issues.map((issue) => ({
95+
path: issue.path.join("."),
96+
code: issue.code,
97+
})),
98+
});
99+
}
62100
}
63101
}
64102

apps/webapp/app/v3/services/resetIdempotencyKey.server.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export class ResetIdempotencyKeyService extends BaseService {
2626
// resetIdempotency clears both the snapshot fields and the Redis
2727
// lookup atomically. Returns null when nothing was bound there.
2828
const buffer = getMollifierBuffer();
29+
let bufferResetFailed = false;
2930
const bufferResult = buffer
3031
? await buffer
3132
.resetIdempotency({
@@ -34,8 +35,12 @@ export class ResetIdempotencyKeyService extends BaseService {
3435
idempotencyKey,
3536
})
3637
.catch((err) => {
37-
// Buffer outage shouldn't 500 the reset endpoint if PG
38-
// already cleared something. Log and treat as a miss.
38+
// Don't drop a buffer outage on the floor. We log + flag so
39+
// the 404 branch below can distinguish "no record anywhere"
40+
// (legitimate not-found) from "PG cleared nothing AND we
41+
// couldn't see the buffer" (partial outage — caller should
42+
// retry, not be told "doesn't exist").
43+
bufferResetFailed = true;
3944
logger.error("ResetIdempotencyKeyService: buffer reset failed", {
4045
idempotencyKey,
4146
taskIdentifier,
@@ -47,6 +52,16 @@ export class ResetIdempotencyKeyService extends BaseService {
4752

4853
const totalCount = pgCount + (bufferResult.clearedRunId ? 1 : 0);
4954

55+
if (pgCount === 0 && bufferResetFailed) {
56+
// PG saw nothing AND the buffer is unreachable. We can't truthfully
57+
// say "not found" — there may be a buffered run we can't observe.
58+
// Surface as 503 so the caller retries instead of being misled.
59+
throw new ServiceValidationError(
60+
"Unable to verify buffered idempotency state right now; please retry",
61+
503
62+
);
63+
}
64+
5065
if (totalCount === 0) {
5166
throw new ServiceValidationError(
5267
`No runs found with idempotency key: ${idempotencyKey} and task: ${taskIdentifier}`,
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
// Mock the db module so the BaseService default prisma doesn't try to
4+
// open a real connection at module load. Each test wires its own
5+
// prisma stub.
6+
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
7+
// Prevent the runEngine singleton from instantiating and spinning up
8+
// PG/Redis workers at module load — without this CI fails with
9+
// unhandled `PrismaClientInitializationError`s even though the
10+
// assertions all pass (see `mollifierDrainerWorker.test.ts`).
11+
vi.mock("~/v3/runEngine.server", () => ({ engine: {} }));
12+
13+
// Hoisted mock state so we can swap the buffer per test without
14+
// re-importing modules.
15+
const bufferMock: { current: unknown } = { current: null };
16+
vi.mock("~/v3/mollifier/mollifierBuffer.server", () => ({
17+
getMollifierBuffer: () => bufferMock.current,
18+
}));
19+
20+
import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server";
21+
import { ServiceValidationError } from "~/v3/services/baseService.server";
22+
23+
type FakePrisma = {
24+
taskRun: { updateMany: (...args: unknown[]) => Promise<{ count: number }> };
25+
};
26+
27+
function makePrisma(pgCount: number): FakePrisma {
28+
return {
29+
taskRun: {
30+
updateMany: vi.fn(async () => ({ count: pgCount })),
31+
},
32+
};
33+
}
34+
35+
const env = {
36+
id: "env_a",
37+
organizationId: "org_1",
38+
} as unknown as Parameters<ResetIdempotencyKeyService["call"]>[2];
39+
40+
describe("ResetIdempotencyKeyService — buffer-outage handling", () => {
41+
it("returns success when PG cleared >=1 run, even if the buffer reset throws", async () => {
42+
bufferMock.current = {
43+
resetIdempotency: vi.fn(async () => {
44+
throw new Error("ECONNREFUSED");
45+
}),
46+
};
47+
const prisma = makePrisma(1);
48+
const service = new ResetIdempotencyKeyService(prisma as never);
49+
50+
const result = await service.call("ikey", "task", env);
51+
expect(result).toEqual({ id: "ikey" });
52+
});
53+
54+
it("returns success when PG cleared nothing but the buffer cleared a run", async () => {
55+
bufferMock.current = {
56+
resetIdempotency: vi.fn(async () => ({ clearedRunId: "run_x" })),
57+
};
58+
const prisma = makePrisma(0);
59+
const service = new ResetIdempotencyKeyService(prisma as never);
60+
61+
const result = await service.call("ikey", "task", env);
62+
expect(result).toEqual({ id: "ikey" });
63+
});
64+
65+
it("404s when PG and buffer both legitimately report 'nothing to clear'", async () => {
66+
bufferMock.current = {
67+
resetIdempotency: vi.fn(async () => ({ clearedRunId: null })),
68+
};
69+
const prisma = makePrisma(0);
70+
const service = new ResetIdempotencyKeyService(prisma as never);
71+
72+
await expect(service.call("ikey", "task", env)).rejects.toMatchObject({
73+
status: 404,
74+
});
75+
});
76+
77+
// Regression for the silent-not-found hazard CodeRabbit flagged: if PG
78+
// sees nothing AND we can't read the buffer (Redis outage), the
79+
// previous behaviour was to 404 — masking a partial outage and
80+
// leaving a buffered key effectively un-reset while the caller was
81+
// told "doesn't exist." We now surface 503 so the caller retries.
82+
it("503s when PG cleared nothing AND the buffer reset failed (partial outage)", async () => {
83+
bufferMock.current = {
84+
resetIdempotency: vi.fn(async () => {
85+
throw new Error("ECONNREFUSED");
86+
}),
87+
};
88+
const prisma = makePrisma(0);
89+
const service = new ResetIdempotencyKeyService(prisma as never);
90+
91+
const error = await service.call("ikey", "task", env).then(
92+
() => null,
93+
(err) => err,
94+
);
95+
expect(error).toBeInstanceOf(ServiceValidationError);
96+
expect(error.status).toBe(503);
97+
expect(error.message).toMatch(/retry/i);
98+
});
99+
100+
it("404s normally when buffer is null (mollifier disabled) and PG cleared nothing", async () => {
101+
bufferMock.current = null;
102+
const prisma = makePrisma(0);
103+
const service = new ResetIdempotencyKeyService(prisma as never);
104+
105+
await expect(service.call("ikey", "task", env)).rejects.toMatchObject({
106+
status: 404,
107+
});
108+
});
109+
});

0 commit comments

Comments
 (0)