Skip to content

Commit 7ab766a

Browse files
d-csclaude
andcommitted
feat(webapp): mollifier API mutations on buffered runs
Cancel, replay, reschedule, metadata, tags, and idempotency-key-reset now succeed against a run that's still in the mollifier buffer. Mutations are applied to the buffered snapshot via Lua CAS; the drainer carries the mutation forward when it replays. Stacked on the reads PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c8ab214 commit 7ab766a

12 files changed

Lines changed: 1233 additions & 120 deletions

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 144 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,101 @@
1+
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
12
import { json } from "@remix-run/server-runtime";
23
import { tryCatch } from "@trigger.dev/core/utils";
4+
import type { RunMetadataChangeOperation } from "@trigger.dev/core/v3/schemas";
35
import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
46
import { z } from "zod";
7+
import { $replica } from "~/db.server";
8+
import { authenticateApiRequest } from "~/services/apiAuth.server";
59
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
610
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
711
import { ServiceValidationError } from "~/v3/services/common.server";
12+
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
13+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
814

915
const ParamsSchema = z.object({
1016
runId: z.string(),
1117
});
1218

19+
// Phase A6 — fixes the pre-existing route bug where GET on this URL
20+
// returned a Remix "no loader" 400. The route only exposed PUT (update);
21+
// GET had no handler. Returns `{ metadata, metadataType }` from either
22+
// the Postgres row or the mollifier buffer snapshot.
23+
export async function loader({ request, params }: LoaderFunctionArgs) {
24+
const authenticationResult = await authenticateApiRequest(request);
25+
if (!authenticationResult) {
26+
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
27+
}
28+
29+
const parsed = ParamsSchema.safeParse(params);
30+
if (!parsed.success) {
31+
return json({ error: "Invalid or missing run ID" }, { status: 400 });
32+
}
33+
34+
const env = authenticationResult.environment;
35+
36+
const pgRun = await $replica.taskRun.findFirst({
37+
where: { friendlyId: parsed.data.runId, runtimeEnvironmentId: env.id },
38+
select: { metadata: true, metadataType: true },
39+
});
40+
if (pgRun) {
41+
return json({ metadata: pgRun.metadata, metadataType: pgRun.metadataType }, { status: 200 });
42+
}
43+
44+
const buffered = await findRunByIdWithMollifierFallback({
45+
runId: parsed.data.runId,
46+
environmentId: env.id,
47+
organizationId: env.organizationId,
48+
});
49+
if (buffered) {
50+
return json(
51+
{
52+
metadata: buffered.metadata ?? null,
53+
metadataType: buffered.metadataType ?? "application/json",
54+
},
55+
{ status: 200 }
56+
);
57+
}
58+
59+
return json({ error: "Run not found" }, { status: 404 });
60+
}
61+
62+
// Route parent/root operations to the existing PG service by directly
63+
// invoking it against the parent/root runId. The service ingests via
64+
// its batching worker, which targets PG by id. If the parent/root is
65+
// itself buffered we recurse through our buffered-mutation helper.
66+
// `_ingestion_only` flag: a synthetic body that has the operations
67+
// promoted to top-level `operations` so the service applies them to
68+
// `targetRunId` directly.
69+
async function routeOperationsToRun(
70+
targetRunId: string | undefined,
71+
operations: RunMetadataChangeOperation[] | undefined,
72+
env: { id: string; organizationId: string }
73+
): Promise<void> {
74+
if (!targetRunId || !operations || operations.length === 0) return;
75+
76+
// Try PG first via the existing service (this is how parent/root
77+
// operations have always landed; preserve that).
78+
const [error] = await tryCatch(
79+
updateMetadataService.call(
80+
targetRunId,
81+
{ operations },
82+
{ id: env.id, organizationId: env.organizationId } as unknown as Parameters<
83+
typeof updateMetadataService.call
84+
>[2]
85+
)
86+
);
87+
if (!error) return;
88+
89+
// PG service threw — could be "Cannot update metadata for a completed
90+
// run" or similar. If the target is buffered, route operations to its
91+
// snapshot too. Best-effort; do not surface this failure to the
92+
// caller — the parent/root ops are auxiliary.
93+
await applyMetadataMutationToBufferedRun({
94+
runId: targetRunId,
95+
body: { operations },
96+
});
97+
}
98+
1399
const { action } = createActionApiRoute(
14100
{
15101
params: ParamsSchema,
@@ -18,23 +104,72 @@ const { action } = createActionApiRoute(
18104
method: "PUT",
19105
},
20106
async ({ authentication, body, params }) => {
21-
const [error, result] = await tryCatch(
22-
updateMetadataService.call(params.runId, body, authentication.environment)
23-
);
107+
const env = authentication.environment;
108+
const runId = params.runId;
24109

25-
if (error) {
26-
if (error instanceof ServiceValidationError) {
27-
return json({ error: error.message }, { status: error.status ?? 422 });
110+
// PG-canonical path. If the run is in PG, the existing service
111+
// owns the full request shape including parent/root operations,
112+
// metadataVersion CAS, batching, validation — none of which the
113+
// buffer side needs to reimplement.
114+
const [pgError, pgResult] = await tryCatch(
115+
updateMetadataService.call(runId, body, env)
116+
);
117+
if (pgError) {
118+
if (pgError instanceof ServiceValidationError) {
119+
return json({ error: pgError.message }, { status: pgError.status ?? 422 });
28120
}
29-
30121
return json({ error: "Internal Server Error" }, { status: 500 });
31122
}
123+
if (pgResult) {
124+
return json(pgResult, { status: 200 });
125+
}
32126

33-
if (!result) {
127+
// PG miss. Target run is either buffered or genuinely absent.
128+
const bufferOutcome = await applyMetadataMutationToBufferedRun({
129+
runId,
130+
body: { metadata: body.metadata, operations: body.operations },
131+
});
132+
133+
if (bufferOutcome.kind === "not_found") {
34134
return json({ error: "Task Run not found" }, { status: 404 });
35135
}
136+
if (bufferOutcome.kind === "busy") {
137+
// Entry is materialising. Best path is to retry the PG call —
138+
// the row may be visible now. We don't waste a roundtrip in
139+
// the happy path, but a 503 here would be customer-visible
140+
// breakage for legitimately-burst workloads. Hand back 503 with
141+
// a retry hint; SDK retry policy converges.
142+
return json({ error: "Run materialising, retry shortly" }, { status: 503 });
143+
}
144+
if (bufferOutcome.kind === "version_exhausted") {
145+
// Pathological contention — many concurrent metadata writers on
146+
// the same buffered runId. Surface as 503 rather than silently
147+
// dropping the request.
148+
return json({ error: "Metadata write contention; retry shortly" }, { status: 503 });
149+
}
150+
151+
// Buffered metadata mutation succeeded. Fan parent/root operations
152+
// out to their respective runs (parent/root are typically PG-
153+
// materialised by the time the child is buffered, so the existing
154+
// service handles them; if they're also buffered, the helper
155+
// recurses through the buffered mutation path).
156+
const bufferedEntry = await findRunByIdWithMollifierFallback({
157+
runId,
158+
environmentId: env.id,
159+
organizationId: env.organizationId,
160+
});
161+
if (bufferedEntry) {
162+
await Promise.all([
163+
routeOperationsToRun(bufferedEntry.parentTaskRunId, body.parentOperations, env),
164+
// The snapshot doesn't carry rootTaskRunId; fall back to parent
165+
// as a rough proxy (matches the existing service's nil-coalesce
166+
// behaviour where rootTaskRun defaults to the parent). Phase D
167+
// / future work could thread rootTaskRunId through the snapshot.
168+
routeOperationsToRun(bufferedEntry.parentTaskRunId, body.rootOperations, env),
169+
]);
170+
}
36171

37-
return json(result, { status: 200 });
172+
return json({ metadata: bufferOutcome.newMetadata }, { status: 200 });
38173
}
39174
);
40175

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
7+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
78
import { logger } from "~/services/logger.server";
9+
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
810

911
const ParamsSchema = z.object({
1012
runId: z.string(),
1113
});
1214

1315
export async function action({ request, params }: ActionFunctionArgs) {
14-
// Ensure this is a POST request
1516
if (request.method.toUpperCase() !== "POST") {
1617
return { status: 405, body: "Method Not Allowed" };
1718
}
1819

19-
// Authenticate the request
2020
const authenticationResult = await authenticateApiRequest(request);
2121
if (!authenticationResult) {
2222
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
@@ -32,59 +32,67 @@ export async function action({ request, params }: ActionFunctionArgs) {
3232

3333
try {
3434
const anyBody = await request.json();
35-
3635
const body = AddTagsRequestBody.safeParse(anyBody);
3736
if (!body.success) {
3837
return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 });
3938
}
40-
41-
const run = await prisma.taskRun.findFirst({
42-
where: {
43-
friendlyId: parsedParams.data.runId,
44-
runtimeEnvironmentId: authenticationResult.environment.id,
45-
},
46-
select: {
47-
runTags: true,
48-
},
49-
});
50-
51-
const existingTags = run?.runTags ?? [];
52-
53-
//remove duplicate tags from the new tags
5439
const bodyTags = typeof body.data.tags === "string" ? [body.data.tags] : body.data.tags;
55-
const newTags = bodyTags.filter((tag) => {
56-
if (tag.trim().length === 0) return false;
57-
return !existingTags.includes(tag);
58-
});
59-
60-
if (existingTags.length + newTags.length > MAX_TAGS_PER_RUN) {
61-
return json(
62-
{
63-
error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${
64-
existingTags.length + newTags.length
65-
}. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`,
66-
},
67-
{ status: 422 }
68-
);
69-
}
40+
const nonEmptyTags = bodyTags.filter((t) => t.trim().length > 0);
7041

71-
if (newTags.length === 0) {
42+
if (nonEmptyTags.length === 0) {
7243
return json({ message: "No new tags to add" }, { status: 200 });
7344
}
7445

75-
await prisma.taskRun.update({
76-
where: {
77-
friendlyId: parsedParams.data.runId,
78-
runtimeEnvironmentId: authenticationResult.environment.id,
79-
},
80-
data: {
81-
runTags: {
82-
push: newTags,
83-
},
46+
const env = authenticationResult.environment;
47+
const outcome = await mutateWithFallback({
48+
runId: parsedParams.data.runId,
49+
environmentId: env.id,
50+
organizationId: env.organizationId,
51+
bufferPatch: { type: "append_tags", tags: nonEmptyTags },
52+
pgMutation: async (taskRun) => {
53+
const existing = taskRun.runTags ?? [];
54+
const newTags = nonEmptyTags.filter((t) => !existing.includes(t));
55+
56+
if (existing.length + newTags.length > MAX_TAGS_PER_RUN) {
57+
return json(
58+
{
59+
error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${
60+
existing.length + newTags.length
61+
}. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`,
62+
},
63+
{ status: 422 }
64+
);
65+
}
66+
if (newTags.length === 0) {
67+
return json({ message: "No new tags to add" }, { status: 200 });
68+
}
69+
await prisma.taskRun.update({
70+
where: {
71+
id: taskRun.id,
72+
runtimeEnvironmentId: env.id,
73+
},
74+
data: { runTags: { push: newTags } },
75+
});
76+
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
8477
},
78+
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates
79+
// against existing snapshot tags atomically. MAX_TAGS_PER_RUN
80+
// enforcement is skipped on the buffered side — the drainer's
81+
// engine.trigger writes the PG row without enforcement either,
82+
// matching today's pre-buffer trigger semantics. A future
83+
// refinement could push the limit check into the Lua.
84+
synthesisedResponse: () =>
85+
json({ message: `Successfully set ${nonEmptyTags.length} new tags.` }, { status: 200 }),
86+
abortSignal: getRequestAbortSignal(),
8587
});
8688

87-
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
89+
if (outcome.kind === "not_found") {
90+
return json({ error: "Run not found" }, { status: 404 });
91+
}
92+
if (outcome.kind === "timed_out") {
93+
return json({ error: "Run materialisation timed out" }, { status: 503 });
94+
}
95+
return outcome.response;
8896
} catch (error) {
8997
logger.error("Failed to add run tags", { error });
9098
return json({ error: "Something went wrong, please try again." }, { status: 500 });

apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
3+
import type { TaskRun } from "@trigger.dev/database";
34
import { z } from "zod";
45
import { prisma } from "~/db.server";
56
import { authenticateApiRequest } from "~/services/apiAuth.server";
67
import { logger } from "~/services/logger.server";
78
import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server";
9+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
810
import { sanitizeTriggerSource } from "~/utils/triggerSource";
911

1012
const ParamsSchema = z.object({
@@ -32,12 +34,34 @@ export async function action({ request, params }: ActionFunctionArgs) {
3234
const { runParam } = parsed.data;
3335

3436
try {
35-
const taskRun = await prisma.taskRun.findUnique({
37+
const env = authenticationResult.environment;
38+
// PG-first. Replay works on any status per audit (Q2 design) — no
39+
// filter beyond friendlyId is the existing semantic; findFirst with
40+
// env scoping tightens it minimally without changing behaviour for
41+
// a correctly-authed caller.
42+
let taskRun: TaskRun | null = await prisma.taskRun.findFirst({
3643
where: {
3744
friendlyId: runParam,
45+
runtimeEnvironmentId: env.id,
3846
},
3947
});
4048

49+
if (!taskRun) {
50+
// Buffered fallback (Q2). The SyntheticRun shape was extended in
51+
// Phase B4 to carry every field ReplayTaskRunService reads from a
52+
// TaskRun. Cast through unknown — the synthesised object has the
53+
// same field surface as a real PG row from the service's
54+
// perspective.
55+
const buffered = await findRunByIdWithMollifierFallback({
56+
runId: runParam,
57+
environmentId: env.id,
58+
organizationId: env.organizationId,
59+
});
60+
if (buffered) {
61+
taskRun = buffered as unknown as TaskRun;
62+
}
63+
}
64+
4165
if (!taskRun) {
4266
return json({ error: "Run not found" }, { status: 404 });
4367
}

0 commit comments

Comments
 (0)