Skip to content

Commit 7482e21

Browse files
d-csclaude
andcommitted
feat(webapp): mollifier API GET read-fallback — synthetic primitives + route wiring
Synthesise QUEUED/FAILED responses from the mollifier buffer when a TaskRun row hasn't landed in Postgres yet. Wires the synthesis into: - ApiRetrieveRunPresenter - v1 trace GET route - v1 spans GET route - attempts route gains a GET loader (fixes pre-existing Remix 400) Stacked on the trigger-time decisions PR. The readFallback infra itself lives on the trigger PR (consumed by IdempotencyKeyConcern); this PR adds the route-level synthetic-rendering primitives. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c6fa61f commit 7482e21

9 files changed

Lines changed: 926 additions & 30 deletions

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import assertNever from "assert-never";
1515
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
1616
import { $replica, prisma } from "~/db.server";
1717
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
18+
import {
19+
findRunByIdWithMollifierFallback,
20+
type SyntheticRun,
21+
} from "~/v3/mollifier/readFallback.server";
1822
import { generatePresignedUrl } from "~/v3/objectStore.server";
1923
import { tracer } from "~/v3/tracer.server";
2024
import { startSpanWithEnv } from "~/v3/tracing.server";
@@ -64,13 +68,34 @@ type CommonRelatedRun = Prisma.Result<
6468
"findFirstOrThrow"
6569
>;
6670

67-
type FoundRun = NonNullable<Awaited<ReturnType<typeof ApiRetrieveRunPresenter.findRun>>>;
71+
// Full shape returned by findRun() — the commonRunSelect fields plus the
72+
// extras the route handler reads. Declared explicitly (not inferred via
73+
// ReturnType<typeof findRun>) so findRun can return a synthesised buffered
74+
// run without the type becoming self-referential.
75+
type FoundRun = CommonRelatedRun & {
76+
traceId: string;
77+
payload: string;
78+
payloadType: string;
79+
output: string | null;
80+
outputType: string;
81+
error: Prisma.JsonValue;
82+
attempts: { id: string }[];
83+
attemptNumber: number | null;
84+
engine: "V1" | "V2";
85+
taskEventStore: string;
86+
parentTaskRun: CommonRelatedRun | null;
87+
rootTaskRun: CommonRelatedRun | null;
88+
childRuns: CommonRelatedRun[];
89+
};
6890

6991
export class ApiRetrieveRunPresenter {
7092
constructor(private readonly apiVersion: API_VERSIONS) {}
7193

72-
public static async findRun(friendlyId: string, env: AuthenticatedEnvironment) {
73-
return $replica.taskRun.findFirst({
94+
public static async findRun(
95+
friendlyId: string,
96+
env: AuthenticatedEnvironment,
97+
): Promise<FoundRun | null> {
98+
const pgRow = await $replica.taskRun.findFirst({
7499
where: {
75100
friendlyId,
76101
runtimeEnvironmentId: env.id,
@@ -102,6 +127,23 @@ export class ApiRetrieveRunPresenter {
102127
},
103128
},
104129
});
130+
131+
if (pgRow) return pgRow;
132+
133+
// Postgres miss → fall back to the mollifier buffer. When the gate
134+
// diverted a trigger, the run lives in Redis until the drainer replays
135+
// it through engine.trigger. Synthesise the FoundRun shape so call()
136+
// returns a `QUEUED` (or `FAILED`) response with empty output, no
137+
// attempts, no relations.
138+
const buffered = await findRunByIdWithMollifierFallback({
139+
runId: friendlyId,
140+
environmentId: env.id,
141+
organizationId: env.organizationId,
142+
});
143+
144+
if (!buffered) return null;
145+
146+
return synthesiseFoundRunFromBuffer(buffered);
105147
}
106148

107149
public async call(taskRun: FoundRun, env: AuthenticatedEnvironment) {
@@ -475,3 +517,75 @@ function resolveTriggerFunction(run: CommonRelatedRun): TriggerFunction {
475517
return run.resumeParentOnCompletion ? "triggerAndWait" : "trigger";
476518
}
477519
}
520+
521+
// Build a FoundRun-shaped object from a buffered (mollified) run. The run
522+
// is in the Redis buffer; engine.trigger hasn't created the Postgres row
523+
// yet, so every field that comes from execution state (output, attempts,
524+
// completedAt, cost, relations) takes a default. The presenter's call()
525+
// handles QUEUED-state runs without surprise.
526+
function bufferedStatusToTaskRunStatus(status: SyntheticRun["status"]): TaskRunStatus {
527+
switch (status) {
528+
case "FAILED":
529+
return "SYSTEM_FAILURE";
530+
case "CANCELED":
531+
return "CANCELED";
532+
default:
533+
return "PENDING";
534+
}
535+
}
536+
537+
function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
538+
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);
539+
540+
const errorJson: Prisma.JsonValue = buffered.error
541+
? {
542+
type: "STRING_ERROR",
543+
raw: `${buffered.error.code}: ${buffered.error.message}`,
544+
}
545+
: null;
546+
547+
const metadata: Prisma.JsonValue =
548+
typeof buffered.metadata === "string" ? buffered.metadata : null;
549+
550+
return {
551+
id: buffered.friendlyId,
552+
friendlyId: buffered.friendlyId,
553+
status,
554+
taskIdentifier: buffered.taskIdentifier ?? "",
555+
createdAt: buffered.createdAt,
556+
startedAt: null,
557+
updatedAt: buffered.cancelledAt ?? buffered.createdAt,
558+
completedAt: buffered.cancelledAt ?? null,
559+
expiredAt: null,
560+
delayUntil: buffered.delayUntil ?? null,
561+
metadata,
562+
metadataType: buffered.metadataType ?? "application/json",
563+
ttl: buffered.ttl ?? null,
564+
costInCents: 0,
565+
baseCostInCents: 0,
566+
usageDurationMs: 0,
567+
idempotencyKey: buffered.idempotencyKey ?? null,
568+
idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null,
569+
isTest: buffered.isTest,
570+
depth: buffered.depth,
571+
scheduleId: null,
572+
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
573+
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
574+
batch: null,
575+
runTags: buffered.tags,
576+
traceId: buffered.traceId ?? "",
577+
payload: typeof buffered.payload === "string" ? buffered.payload : "",
578+
payloadType: buffered.payloadType ?? "application/json",
579+
output: null,
580+
outputType: "application/json",
581+
error: errorJson,
582+
attempts: [],
583+
attemptNumber: null,
584+
engine: "V2",
585+
taskEventStore: "taskEvent",
586+
workerQueue: buffered.workerQueue ?? "main",
587+
parentTaskRun: null,
588+
rootTaskRun: null,
589+
childRuns: [],
590+
};
591+
}

apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,42 +9,101 @@ import {
99
} from "~/services/routeBuilders/apiBuilder.server";
1010
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1111
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
12+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
1213

1314
const ParamsSchema = z.object({
1415
runId: z.string(),
1516
spanId: z.string(),
1617
});
1718

19+
// Phase A2 — discriminated union for PG vs buffered runs. Buffered runs
20+
// only have one valid spanId (the queued span recorded at gate time and
21+
// reused as the run's root spanId when the drainer materialises). Any
22+
// other spanId returns a deterministic 404; the queued span returns a
23+
// minimal synthesised shape so the customer's SDK sees the same 200
24+
// contract they'd get for a freshly-triggered run.
25+
type ResolvedRun =
26+
| { source: "pg"; run: Awaited<ReturnType<typeof findPgRun>> & {} }
27+
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };
28+
29+
async function findPgRun(runId: string, environmentId: string) {
30+
return $replica.taskRun.findFirst({
31+
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
32+
});
33+
}
34+
1835
export const loader = createLoaderApiRoute(
1936
{
2037
params: ParamsSchema,
2138
allowJWT: true,
2239
corsStrategy: "all",
23-
findResource: (params, auth) => {
24-
return $replica.taskRun.findFirst({
25-
where: {
26-
friendlyId: params.runId,
27-
runtimeEnvironmentId: auth.environment.id,
28-
},
40+
findResource: async (params, auth): Promise<ResolvedRun | null> => {
41+
const pgRun = await findPgRun(params.runId, auth.environment.id);
42+
if (pgRun) return { source: "pg", run: pgRun };
43+
44+
const buffered = await findRunByIdWithMollifierFallback({
45+
runId: params.runId,
46+
environmentId: auth.environment.id,
47+
organizationId: auth.environment.organizationId,
2948
});
49+
if (buffered) return { source: "buffer", run: buffered };
50+
51+
return null;
3052
},
3153
shouldRetryNotFound: true,
3254
authorization: {
3355
action: "read",
34-
resource: (run) => {
56+
resource: (resolved) => {
57+
if (resolved.source === "pg") {
58+
const run = resolved.run;
59+
const resources = [
60+
{ type: "runs", id: run.friendlyId },
61+
{ type: "tasks", id: run.taskIdentifier },
62+
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
63+
];
64+
if (run.batchId) {
65+
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
66+
}
67+
return anyResource(resources);
68+
}
69+
const run = resolved.run;
3570
const resources = [
3671
{ type: "runs", id: run.friendlyId },
37-
{ type: "tasks", id: run.taskIdentifier },
38-
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
72+
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
73+
...run.tags.map((tag) => ({ type: "tags", id: tag })),
3974
];
40-
if (run.batchId) {
41-
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
42-
}
4375
return anyResource(resources);
4476
},
4577
},
4678
},
47-
async ({ params, resource: run, authentication }) => {
79+
async ({ params, resource: resolved, authentication }) => {
80+
if (resolved.source === "buffer") {
81+
// Buffered runs have exactly one valid spanId — the queued span the
82+
// mollifier gate recorded at trigger time, which becomes the run's
83+
// root spanId once the drainer materialises. Any other spanId is a
84+
// deterministic 404. The matching spanId returns a minimal shape
85+
// representing "span exists, no execution data yet."
86+
if (resolved.run.spanId !== params.spanId) {
87+
return json({ error: "Span not found" }, { status: 404 });
88+
}
89+
return json(
90+
{
91+
spanId: resolved.run.spanId,
92+
parentId: resolved.run.parentSpanId ?? null,
93+
runId: resolved.run.friendlyId,
94+
message: resolved.run.taskIdentifier ?? "",
95+
isError: false,
96+
isPartial: resolved.run.status !== "CANCELED",
97+
isCancelled: resolved.run.status === "CANCELED",
98+
level: "TRACE",
99+
startTime: resolved.run.createdAt,
100+
durationMs: 0,
101+
},
102+
{ status: 200 }
103+
);
104+
}
105+
106+
const run = resolved.run;
48107
const eventRepository = await getEventRepositoryForStore(
49108
run.taskEventStore,
50109
authentication.environment.organization.id

apps/webapp/app/routes/api.v1.runs.$runId.trace.ts

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,41 +8,108 @@ import {
88
} from "~/services/routeBuilders/apiBuilder.server";
99
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1010
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
11+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
1112

1213
const ParamsSchema = z.object({
1314
runId: z.string(), // This is the run friendly ID
1415
});
1516

17+
// Discriminator on the resolved resource — `pg` is the real Prisma TaskRun
18+
// row, `buffer` is a synthesised shape from the mollifier buffer for runs
19+
// whose drainer hasn't yet materialised them. The handler renders an empty
20+
// trace for buffered runs so the customer sees the same 200 shape they'd
21+
// get for a freshly-triggered PG run with no spans yet (matches the
22+
// pass-through control case in scripts/mollifier-api-parity.sh).
23+
type ResolvedRun =
24+
| { source: "pg"; run: Awaited<ReturnType<typeof findPgRun>> & {} }
25+
| { source: "buffer"; run: NonNullable<Awaited<ReturnType<typeof findRunByIdWithMollifierFallback>>> };
26+
27+
async function findPgRun(runId: string, environmentId: string) {
28+
return $replica.taskRun.findFirst({
29+
where: { friendlyId: runId, runtimeEnvironmentId: environmentId },
30+
});
31+
}
32+
1633
export const loader = createLoaderApiRoute(
1734
{
1835
params: ParamsSchema,
1936
allowJWT: true,
2037
corsStrategy: "all",
21-
findResource: (params, auth) => {
22-
return $replica.taskRun.findFirst({
23-
where: {
24-
friendlyId: params.runId,
25-
runtimeEnvironmentId: auth.environment.id,
26-
},
38+
findResource: async (params, auth): Promise<ResolvedRun | null> => {
39+
const pgRun = await findPgRun(params.runId, auth.environment.id);
40+
if (pgRun) return { source: "pg", run: pgRun };
41+
42+
const buffered = await findRunByIdWithMollifierFallback({
43+
runId: params.runId,
44+
environmentId: auth.environment.id,
45+
organizationId: auth.environment.organizationId,
2746
});
47+
if (buffered) return { source: "buffer", run: buffered };
48+
49+
return null;
2850
},
2951
shouldRetryNotFound: true,
3052
authorization: {
3153
action: "read",
32-
resource: (run) => {
54+
resource: (resolved) => {
55+
if (resolved.source === "pg") {
56+
const run = resolved.run;
57+
const resources = [
58+
{ type: "runs", id: run.friendlyId },
59+
{ type: "tasks", id: run.taskIdentifier },
60+
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
61+
];
62+
if (run.batchId) {
63+
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
64+
}
65+
return anyResource(resources);
66+
}
67+
const run = resolved.run;
3368
const resources = [
3469
{ type: "runs", id: run.friendlyId },
35-
{ type: "tasks", id: run.taskIdentifier },
36-
...run.runTags.map((tag) => ({ type: "tags", id: tag })),
70+
...(run.taskIdentifier ? [{ type: "tasks", id: run.taskIdentifier }] : []),
71+
...run.tags.map((tag) => ({ type: "tags", id: tag })),
3772
];
38-
if (run.batchId) {
39-
resources.push({ type: "batch", id: BatchId.toFriendlyId(run.batchId) });
40-
}
4173
return anyResource(resources);
4274
},
4375
},
4476
},
45-
async ({ resource: run, authentication }) => {
77+
async ({ resource: resolved, authentication }) => {
78+
if (resolved.source === "buffer") {
79+
// Buffered runs have no events ingested yet — the drainer hasn't
80+
// materialised the PG row and the worker hasn't started executing.
81+
// Synthesise a single partial span that satisfies the SDK's
82+
// RetrieveRunTraceResponseBody schema (rootSpan is non-nullable).
83+
const buffered = resolved.run;
84+
return json(
85+
{
86+
trace: {
87+
traceId: buffered.traceId ?? "",
88+
rootSpan: {
89+
id: buffered.spanId ?? "",
90+
runId: buffered.friendlyId,
91+
data: {
92+
message: buffered.taskIdentifier ?? "",
93+
taskSlug: buffered.taskIdentifier ?? undefined,
94+
events: [],
95+
startTime: buffered.createdAt,
96+
duration: 0,
97+
isError: false,
98+
isPartial: true,
99+
isCancelled: buffered.status === "CANCELED",
100+
level: "TRACE",
101+
queueName: buffered.queue ?? undefined,
102+
machinePreset: buffered.machinePreset ?? undefined,
103+
},
104+
children: [],
105+
},
106+
},
107+
},
108+
{ status: 200 }
109+
);
110+
}
111+
112+
const run = resolved.run;
46113
const eventRepository = await getEventRepositoryForStore(
47114
run.taskEventStore,
48115
authentication.environment.organization.id

0 commit comments

Comments
 (0)