Skip to content

Commit 188b8c7

Browse files
d-csclaude
andcommitted
fix(webapp): mollifier read-fallback auth/retry parity + batch reconstruction
Addresses the higher-confidence read-fallback review findings: - attempts GET loader: rebuilt on createLoaderApiRoute so it matches the sibling read routes — accepts JWTs with run/task/tag/batch resource scoping (was bare authenticateApiRequest, rejecting PUBLIC_JWT and doing no scope check), and 404s with `x-should-retry: true` so SDK pollers keep retrying a not-yet-materialised run instead of giving up. - batch reconstruction: the snapshot embeds the batch as `{ id, index }` (engine.trigger shape), but readFallback read a non-existent flat `batchId`, so SyntheticRun.batchId was always undefined. Read it from `snapshot.batch.id` (the internal cuid). synthesiseFoundRunFromBuffer now populates `batch` from it, and the spans/trace buffer-path authorization pushes the batch resource — so batch-scoped JWTs authorise against buffered runs and the retrieve response reports the correct batchId. - metadata: coerce a non-string buffered metadata defensively (JSON stringify + warn) instead of silently dropping to null, mirroring synthesisePayload. In practice metadata is always a string, so this is a no-op guard, but it surfaces format drift to ops. - tests: cover batchId extraction from the nested batch object and its absence for non-batched runs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 81a0122 commit 188b8c7

6 files changed

Lines changed: 158 additions & 39 deletions

File tree

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
logger,
1010
} from "@trigger.dev/core/v3";
1111
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
12+
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
1213
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
1314
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
1415
import assertNever from "assert-never";
@@ -560,6 +561,32 @@ function synthesisePayload(buffered: SyntheticRun): string {
560561
}
561562
}
562563

564+
// Mirror synthesisePayload for metadata. The PG path stores
565+
// `TaskRun.metadata` as `String?`, and the snapshot writes it from
566+
// `metadataPacket.data` (also a string), so in production it is always a
567+
// string or absent. We coerce defensively — an object gets JSON-stringified
568+
// (matching how the trigger path serialises it) rather than silently
569+
// dropped to null, and the log line surfaces format drift to ops.
570+
function synthesiseMetadata(buffered: SyntheticRun): string | null {
571+
const metadata = buffered.metadata;
572+
if (typeof metadata === "string") return metadata;
573+
if (metadata === undefined || metadata === null) return null;
574+
try {
575+
const serialised = JSON.stringify(metadata);
576+
logger.warn("ApiRetrieveRunPresenter: buffered snapshot.metadata non-string coerced", {
577+
runFriendlyId: buffered.friendlyId,
578+
metadataType: typeof metadata,
579+
});
580+
return typeof serialised === "string" ? serialised : null;
581+
} catch {
582+
logger.error("ApiRetrieveRunPresenter: buffered snapshot.metadata unserialisable", {
583+
runFriendlyId: buffered.friendlyId,
584+
metadataType: typeof metadata,
585+
});
586+
return null;
587+
}
588+
}
589+
563590
function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
564591
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);
565592

@@ -570,8 +597,7 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
570597
}
571598
: null;
572599

573-
const metadata: Prisma.JsonValue =
574-
typeof buffered.metadata === "string" ? buffered.metadata : null;
600+
const metadata: string | null = synthesiseMetadata(buffered);
575601

576602
return {
577603
// `id` is the internal cuid (Prisma TaskRun.id column), `friendlyId`
@@ -603,7 +629,13 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
603629
scheduleId: null,
604630
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
605631
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
606-
batch: null,
632+
// Reconstruct the batch from the snapshot's internal id so a buffered
633+
// run reports the same `batchId` / triggerFunction as it will once
634+
// materialised, and so batch-scoped JWTs authorise against it (the
635+
// route authorization callbacks read `run.batch?.friendlyId`).
636+
batch: buffered.batchId
637+
? { id: buffered.batchId, friendlyId: BatchId.toFriendlyId(buffered.batchId) }
638+
: null,
607639
runTags: buffered.tags,
608640
traceId: buffered.traceId ?? "",
609641
payload: synthesisePayload(buffered),

apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ export const loader = createLoaderApiRoute(
7272
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
7373
...run.tags.map((tag) => ({ type: "tags", id: tag })),
7474
];
75+
if (run.batchId) {
76+
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
77+
}
7578
return anyResource(resources);
7679
},
7780
},

apps/webapp/app/routes/api.v1.runs.$runId.trace.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ export const loader = createLoaderApiRoute(
7070
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
7171
...run.tags.map((tag) => ({ type: "tags", id: tag })),
7272
];
73+
if (run.batchId) {
74+
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
75+
}
7376
return anyResource(resources);
7477
},
7578
},

apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts

Lines changed: 72 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
import type { ActionFunctionArgs, LoaderFunctionArgs } from "@remix-run/server-runtime";
1+
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
3+
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
34
import { z } from "zod";
45
import { $replica } from "~/db.server";
56
import { authenticateApiRequest } from "~/services/apiAuth.server";
67
import { logger } from "~/services/logger.server";
8+
import {
9+
anyResource,
10+
createLoaderApiRoute,
11+
} from "~/services/routeBuilders/apiBuilder.server";
712
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
813
import { ServiceValidationError } from "~/v3/services/baseService.server";
914
import { CreateTaskRunAttemptService } from "~/v3/services/createTaskRunAttempt.server";
@@ -23,44 +28,76 @@ const ParamsSchema = z.object({
2328
// attempt list belongs on the v3 retrieve endpoint, not here — this is
2429
// the dual of the POST that creates attempts, and the empty-list shape
2530
// gives the parity script a stable contract to assert against.
26-
export async function loader({ request, params }: LoaderFunctionArgs) {
27-
const authenticationResult = await authenticateApiRequest(request);
28-
if (!authenticationResult) {
29-
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
30-
}
31+
//
32+
// Built with createLoaderApiRoute so it matches the sibling read routes
33+
// (spans, trace, retrieve): it accepts JWTs (`allowJWT`) with the same
34+
// run/task/tag/batch resource scoping, and a not-found run returns 404
35+
// with `x-should-retry: true` (`shouldRetryNotFound`) so SDK pollers keep
36+
// retrying a run that the drainer hasn't materialised yet. PG-first then
37+
// buffer fallback, so a third party can't distinguish "exists" from
38+
// "doesn't exist" cross-environment.
39+
type ResolvedRun =
40+
| { source: "pg"; run: NonNullable<Awaited<ReturnType<typeof findPgRun>>> }
41+
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };
3142

32-
const parsed = ParamsSchema.safeParse(params);
33-
if (!parsed.success) {
34-
return json({ error: "Invalid or missing run ID" }, { status: 400 });
35-
}
43+
async function findPgRun(runId: string, environmentId: string) {
44+
return $replica.taskRun.findFirst({
45+
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
46+
select: { friendlyId: true, taskIdentifier: true, runTags: true, batchId: true },
47+
});
48+
}
3649

37-
const { runParam } = parsed.data;
38-
const env = authenticationResult.environment;
50+
export const loader = createLoaderApiRoute(
51+
{
52+
params: ParamsSchema,
53+
allowJWT: true,
54+
corsStrategy: "all",
55+
findResource: async (params, auth): Promise<ResolvedRun | null> => {
56+
const pgRun = await findPgRun(params.runParam, auth.environment.id);
57+
if (pgRun) return { source: "pg", run: pgRun };
3958

40-
// Verify the run belongs to the authenticated environment before
41-
// returning the parity-empty list. The response body is empty either
42-
// way, but other run-scoped endpoints (spans, trace, retrieve) all
43-
// 404 on cross-env access; matching that here means a third party
44-
// can't distinguish "run exists" from "doesn't exist" via this
45-
// endpoint either. PG-first then buffer fallback, consistent with
46-
// the other read paths.
47-
const pgRun = await $replica.taskRun.findFirst({
48-
where: { friendlyId: runParam, runtimeEnvironmentId: env.id },
49-
select: { id: true },
50-
});
51-
if (!pgRun) {
52-
const buffered = await findRunByIdWithMollifierFallback({
53-
runId: runParam,
54-
environmentId: env.id,
55-
organizationId: env.organizationId,
56-
});
57-
if (!buffered) {
58-
return json({ error: "Run not found" }, { status: 404 });
59-
}
60-
}
59+
const buffered = await findRunByIdWithMollifierFallback({
60+
runId: params.runParam,
61+
environmentId: auth.environment.id,
62+
organizationId: auth.environment.organizationId,
63+
});
64+
if (buffered) return { source: "buffer", run: buffered };
6165

62-
return json({ attempts: [] }, { status: 200 });
63-
}
66+
return null;
67+
},
68+
shouldRetryNotFound: true,
69+
authorization: {
70+
action: "read",
71+
resource: (resolved) => {
72+
if (resolved.source === "pg") {
73+
const run = resolved.run;
74+
const resources = [
75+
{ type: "runs", id: run.friendlyId },
76+
{ type: "tasks", id: run.taskIdentifier },
77+
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
78+
];
79+
if (run.batchId) {
80+
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
81+
}
82+
return anyResource(resources);
83+
}
84+
const run = resolved.run;
85+
const resources = [
86+
{ type: "runs", id: run.friendlyId },
87+
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
88+
...run.tags.map((tag) => ({ type: "tags", id: tag })),
89+
];
90+
if (run.batchId) {
91+
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
92+
}
93+
return anyResource(resources);
94+
},
95+
},
96+
},
97+
async () => {
98+
return json({ attempts: [] }, { status: 200 });
99+
}
100+
);
64101

65102
export async function action({ request, params }: ActionFunctionArgs) {
66103
// Authenticate the request

apps/webapp/app/v3/mollifier/readFallback.server.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,12 @@ export async function findRunByIdWithMollifierFallback(
201201
annotations: snapshot.annotations,
202202
traceContext: snapshot.traceContext,
203203
scheduleId: asString(snapshot.scheduleId),
204-
batchId: asString(snapshot.batchId),
204+
// The engine.trigger input embeds the batch as `{ id, index }`
205+
// (see triggerTask.server.ts #buildEngineTriggerInput), not as a
206+
// flat `batchId`. `id` is the batch's internal cuid — the same value
207+
// PG stores in `TaskRun.batchId` — so callers reconstruct the
208+
// friendly id via `BatchId.toFriendlyId` exactly as the PG path does.
209+
batchId: asString((snapshot.batch as { id?: unknown } | undefined)?.id),
205210
parentTaskRunFriendlyId: asString(snapshot.parentTaskRunFriendlyId),
206211
rootTaskRunFriendlyId: asString(snapshot.rootTaskRunFriendlyId),
207212

apps/webapp/test/mollifierReadFallback.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,45 @@ describe("findRunByIdWithMollifierFallback", () => {
259259
expect(result!.runTags).toEqual(["t1", "t2"]);
260260
});
261261

262+
it("extracts batchId from the snapshot's nested batch object (engine.trigger shape)", async () => {
263+
const entry: BufferEntry = {
264+
runId: "run_1",
265+
envId: "env_a",
266+
orgId: "org_1",
267+
payload: JSON.stringify({
268+
taskIdentifier: "t",
269+
// The engine.trigger input nests the batch as `{ id, index }`,
270+
// where `id` is the batch's internal cuid (not a flat `batchId`).
271+
batch: { id: "batch_internal_cuid", index: 3 },
272+
}),
273+
status: "QUEUED",
274+
attempts: 0,
275+
createdAt: NOW,
276+
};
277+
const result = await findRunByIdWithMollifierFallback(
278+
{ runId: "run_1", environmentId: "env_a", organizationId: "org_1" },
279+
{ getBuffer: () => fakeBuffer(entry) },
280+
);
281+
expect(result!.batchId).toBe("batch_internal_cuid");
282+
});
283+
284+
it("leaves batchId undefined when the snapshot has no batch (non-batched run)", async () => {
285+
const entry: BufferEntry = {
286+
runId: "run_1",
287+
envId: "env_a",
288+
orgId: "org_1",
289+
payload: JSON.stringify({ taskIdentifier: "t" }),
290+
status: "QUEUED",
291+
attempts: 0,
292+
createdAt: NOW,
293+
};
294+
const result = await findRunByIdWithMollifierFallback(
295+
{ runId: "run_1", environmentId: "env_a", organizationId: "org_1" },
296+
{ getBuffer: () => fakeBuffer(entry) },
297+
);
298+
expect(result!.batchId).toBeUndefined();
299+
});
300+
262301
it("treats invalid date strings as undefined and does not mis-classify status as CANCELED", async () => {
263302
const entry: BufferEntry = {
264303
runId: "run_1",

0 commit comments

Comments
 (0)