diff --git a/.changeset/mollifier-tag-cap.md b/.changeset/mollifier-tag-cap.md new file mode 100644 index 0000000000..b9057664fa --- /dev/null +++ b/.changeset/mollifier-tag-cap.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +Mollifier `mutateSnapshot` now enforces a tag cap: an `append_tags` patch carrying `maxTags` returns `"limit_exceeded"` (writing nothing) when the deduped tag count would exceed the limit, so a buffered run can't accumulate more tags via the tags API than the trigger validator allows at creation. diff --git a/.server-changes/mollifier-mutations.md b/.server-changes/mollifier-mutations.md new file mode 100644 index 0000000000..d0d5a969cb --- /dev/null +++ b/.server-changes/mollifier-mutations.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier API mutations on buffered runs: tag, metadata, replay, reschedule, cancel, and idempotency-key reset via a buffer-snapshot fallback. When a mutation races a mid-drain run, the wait-and-bounce loop watches the buffer entry in Redis (cheap) and reads the primary exactly once for the actual mutation, instead of polling the writer on a fixed cadence; polls use jittered exponential backoff. diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts index f27a9c13f9..1f5296b04d 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts @@ -1,15 +1,140 @@ +import type { LoaderFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; import { tryCatch } from "@trigger.dev/core/utils"; +import type { RunMetadataChangeOperation } from "@trigger.dev/core/v3/schemas"; import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3"; import { z } from "zod"; +import { $replica } from "~/db.server"; +// Aliased to avoid shadowing the local `env: AuthenticatedEnvironment` +// parameter the route handler and `routeOperationsToRun` use. +import { env as appEnv } from "~/env.server"; +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { authenticateApiRequest } from "~/services/apiAuth.server"; +import { logger } from "~/services/logger.server"; import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { ServiceValidationError } from "~/v3/services/common.server"; +import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runId: z.string(), }); +// GET handler added to fix the pre-existing route bug where this URL +// returned a Remix "no loader" 400 — only PUT (update) was exported, so +// GET had no handler. Returns `{ metadata, metadataType }` from either +// the Postgres row or the mollifier buffer snapshot. +export async function loader({ request, params }: LoaderFunctionArgs) { + const authenticationResult = await authenticateApiRequest(request); + if (!authenticationResult) { + return json({ error: "Invalid or Missing API Key" }, { status: 401 }); + } + + const parsed = ParamsSchema.safeParse(params); + if (!parsed.success) { + return json({ error: "Invalid or missing run ID" }, { status: 400 }); + } + + const env = authenticationResult.environment; + + const pgRun = await $replica.taskRun.findFirst({ + where: { friendlyId: parsed.data.runId, runtimeEnvironmentId: env.id }, + select: { metadata: true, metadataType: true }, + }); + if (pgRun) { + return json({ metadata: pgRun.metadata, metadataType: pgRun.metadataType }, { status: 200 }); + } + + const buffered = await findRunByIdWithMollifierFallback({ + runId: parsed.data.runId, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (buffered) { + return json( + { + metadata: buffered.metadata ?? null, + metadataType: buffered.metadataType ?? "application/json", + }, + { status: 200 } + ); + } + + return json({ error: "Run not found" }, { status: 404 }); +} + +// Route parent/root operations to the existing PG service by directly +// invoking it against the parent/root runId. The service ingests via +// its batching worker, which targets PG by id. If the parent/root is +// itself buffered we recurse through our buffered-mutation helper. +// `_ingestion_only` flag: a synthetic body that has the operations +// promoted to top-level `operations` so the service applies them to +// `targetRunId` directly. +async function routeOperationsToRun( + targetRunId: string | undefined, + operations: RunMetadataChangeOperation[] | undefined, + env: AuthenticatedEnvironment +): Promise { + if (!targetRunId || !operations || operations.length === 0) return; + + // Try PG first via the existing service (this is how parent/root + // operations have always landed; preserve that). Accepts the full + // AuthenticatedEnvironment so we don't have to recover the unsafe + // `as unknown` cast that the previous narrowed `{ id, organizationId }` + // signature forced on us. + // + // Two non-success outcomes from `call`: + // * throws — PG threw (e.g. "Cannot update metadata for a completed + // run", or a transient PG outage). + // * resolves with undefined — PG row didn't exist (the target may be + // buffered, not yet materialised). + // Either way we want to try the buffer fallback below; treating the + // undefined-return as success would make the fallback unreachable. + const [error, result] = await tryCatch( + updateMetadataService.call(targetRunId, { operations }, env) + ); + if (!error && result !== undefined) return; + + if (error) { + // PG threw — auxiliary op, stay best-effort and don't surface this + // to the caller (the caller's primary mutation already landed). But + // warn so a genuine PG outage on these ops isn't invisible. + logger.warn("metadata route: parent/root PG op failed", { + targetRunId, + error: error instanceof Error ? error.message : String(error), + }); + } + + // Buffer fallback only makes sense for friendlyId-keyed entries. The + // PG-side parent/root IDs are internal cuids; the buffer keys entries + // by friendlyId, so passing the internal id would silently no-op. + // Skip explicitly — a buffered child's parent is always materialised + // in PG already (a buffered run hasn't executed, so it can't have + // triggered the child), so the buffered-parent branch isn't actually + // reachable. Treating the no-op as intentional rather than incidental. + if (!targetRunId.startsWith("run_")) return; + + // Best-effort buffer fallback. Wrap so a transient Redis throw on + // this auxiliary op can't 500 the request after the primary mutation + // already succeeded. + const [bufferError] = await tryCatch( + applyMetadataMutationToBufferedRun({ + runId: targetRunId, + environmentId: env.id, + organizationId: env.organizationId, + maximumSize: appEnv.TASK_RUN_METADATA_MAXIMUM_SIZE, + body: { operations }, + }) + ); + if (bufferError) { + logger.warn("metadata route: buffer fallback for parent/root op failed", { + targetRunId, + error: bufferError instanceof Error ? bufferError.message : String(bufferError), + }); + } +} + const { action } = createActionApiRoute( { params: ParamsSchema, @@ -18,23 +143,98 @@ const { action } = createActionApiRoute( method: "PUT", }, async ({ authentication, body, params }) => { - const [error, result] = await tryCatch( - updateMetadataService.call(params.runId, body, authentication.environment) - ); + const env = authentication.environment; + const runId = params.runId; - if (error) { - if (error instanceof ServiceValidationError) { - return json({ error: error.message }, { status: error.status ?? 422 }); + // PG-canonical path. If the run is in PG, the existing service + // owns the full request shape including parent/root operations, + // metadataVersion CAS, batching, validation — none of which the + // buffer side needs to reimplement. + const [pgError, pgResult] = await tryCatch( + updateMetadataService.call(runId, body, env) + ); + if (pgError) { + if (pgError instanceof ServiceValidationError) { + return json({ error: pgError.message }, { status: pgError.status ?? 422 }); } - return json({ error: "Internal Server Error" }, { status: 500 }); } + if (pgResult) { + return json(pgResult, { status: 200 }); + } + + // PG miss. Target run is either buffered or genuinely absent. + const bufferOutcome = await applyMetadataMutationToBufferedRun({ + runId, + environmentId: env.id, + organizationId: env.organizationId, + maximumSize: appEnv.TASK_RUN_METADATA_MAXIMUM_SIZE, + body: { metadata: body.metadata, operations: body.operations }, + }); - if (!result) { + if (bufferOutcome.kind === "not_found") { return json({ error: "Task Run not found" }, { status: 404 }); } + if (bufferOutcome.kind === "metadata_too_large") { + // Mirror PG's `MetadataTooLargeError` (413). + return json( + { + error: `Metadata exceeds maximum size of ${bufferOutcome.maximumSize} bytes`, + }, + { status: 413 } + ); + } + if (bufferOutcome.kind === "busy") { + // Entry is materialising. Best path is to retry the PG call — + // the row may be visible now. We don't waste a roundtrip in + // the happy path, but a 503 here would be customer-visible + // breakage for legitimately-burst workloads. Hand back 503 with + // a retry hint; SDK retry policy converges. + return json({ error: "Run materialising, retry shortly" }, { status: 503 }); + } + if (bufferOutcome.kind === "version_exhausted") { + // Pathological contention — many concurrent metadata writers on + // the same buffered runId. Surface as 503 rather than silently + // dropping the request. + return json({ error: "Metadata write contention; retry shortly" }, { status: 503 }); + } + + // Buffered metadata mutation succeeded. Fan parent/root operations + // out to their respective runs (parent/root are typically PG- + // materialised by the time the child is buffered, so the existing + // service handles them; if they're also buffered, the helper + // recurses through the buffered mutation path). + const bufferedEntry = await findRunByIdWithMollifierFallback({ + runId, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (bufferedEntry) { + // Both parent and root use the friendlyIds derived in + // `readFallback.server.ts` via `internalRunIdToFriendlyId` from the + // internal IDs the engine snapshot carries (`parentTaskRunId` / + // `rootTaskRunId`). The PG-side `UpdateMetadataService` would + // route to `taskRun.parentTaskRun?.id ?? taskRun.id` and + // `taskRun.rootTaskRun?.id ?? taskRun.id` respectively — i.e. fall + // back to the run itself when there's no parent / root. Mirror + // that self-fallback with `?? runId` so a top-level run's + // parent/root ops land on itself (matching PG semantics) instead + // of being silently dropped. + await Promise.all([ + routeOperationsToRun( + bufferedEntry.parentTaskRunFriendlyId ?? runId, + body.parentOperations, + env, + ), + routeOperationsToRun( + bufferedEntry.rootTaskRunFriendlyId ?? runId, + body.rootOperations, + env, + ), + ]); + } - return json(result, { status: 200 }); + return json({ metadata: bufferOutcome.newMetadata }, { status: 200 }); } ); diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts index eae94375b9..ef7f3180bf 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts @@ -1,22 +1,39 @@ import { type ActionFunctionArgs, json } from "@remix-run/server-runtime"; import { AddTagsRequestBody } from "@trigger.dev/core/v3"; +import type { BufferEntry } from "@trigger.dev/redis-worker"; import { z } from "zod"; import { prisma } from "~/db.server"; import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; +import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; + +// Pull the existing tags out of a buffer entry's serialised payload so +// the buffer-path response can dedup against them, matching the +// PG-path's `newTags.length` count rather than the pre-dedup input +// count. Returns null on any parse failure / shape mismatch so the +// caller can fall back gracefully. +function parseSnapshotTags(entry: BufferEntry | null): string[] | null { + if (!entry) return null; + try { + const snapshot = JSON.parse(entry.payload) as { tags?: unknown }; + if (!Array.isArray(snapshot.tags)) return null; + return snapshot.tags.filter((t): t is string => typeof t === "string"); + } catch { + return null; + } +} const ParamsSchema = z.object({ runId: z.string(), }); export async function action({ request, params }: ActionFunctionArgs) { - // Ensure this is a POST request if (request.method.toUpperCase() !== "POST") { return { status: 405, body: "Method Not Allowed" }; } - // Authenticate the request const authenticationResult = await authenticateApiRequest(request); if (!authenticationResult) { return json({ error: "Invalid or Missing API Key" }, { status: 401 }); @@ -32,59 +49,89 @@ export async function action({ request, params }: ActionFunctionArgs) { try { const anyBody = await request.json(); - const body = AddTagsRequestBody.safeParse(anyBody); if (!body.success) { return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 }); } - - const run = await prisma.taskRun.findFirst({ - where: { - friendlyId: parsedParams.data.runId, - runtimeEnvironmentId: authenticationResult.environment.id, - }, - select: { - runTags: true, - }, - }); - - const existingTags = run?.runTags ?? []; - - //remove duplicate tags from the new tags const bodyTags = typeof body.data.tags === "string" ? [body.data.tags] : body.data.tags; - const newTags = bodyTags.filter((tag) => { - if (tag.trim().length === 0) return false; - return !existingTags.includes(tag); - }); - - if (existingTags.length + newTags.length > MAX_TAGS_PER_RUN) { - return json( - { - error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${ - existingTags.length + newTags.length - }. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`, - }, - { status: 422 } - ); - } + const nonEmptyTags = bodyTags.filter((t) => t.trim().length > 0); - if (newTags.length === 0) { + if (nonEmptyTags.length === 0) { return json({ message: "No new tags to add" }, { status: 200 }); } - await prisma.taskRun.update({ - where: { - friendlyId: parsedParams.data.runId, - runtimeEnvironmentId: authenticationResult.environment.id, + const env = authenticationResult.environment; + const outcome = await mutateWithFallback({ + runId: parsedParams.data.runId, + environmentId: env.id, + organizationId: env.organizationId, + bufferPatch: { type: "append_tags", tags: nonEmptyTags, maxTags: MAX_TAGS_PER_RUN }, + pgMutation: async (taskRun) => { + const existing = taskRun.runTags ?? []; + const newTags = nonEmptyTags.filter((t) => !existing.includes(t)); + + if (existing.length + newTags.length > MAX_TAGS_PER_RUN) { + return json( + { + error: `Runs can only have ${MAX_TAGS_PER_RUN} tags, you're trying to set ${ + existing.length + newTags.length + }. These tags have not been set: ${newTags.map((t) => `'${t}'`).join(", ")}.`, + }, + { status: 422 } + ); + } + if (newTags.length === 0) { + return json({ message: "No new tags to add" }, { status: 200 }); + } + await prisma.taskRun.update({ + where: { + id: taskRun.id, + runtimeEnvironmentId: env.id, + }, + data: { runTags: { push: newTags } }, + }); + return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 }); }, - data: { - runTags: { - push: newTags, - }, + // Buffer-applied patch path. The mutateSnapshot Lua deduplicates + // against existing snapshot tags atomically and enforces + // MAX_TAGS_PER_RUN via the `maxTags` we pass in `bufferPatch` — + // matching the PG-path cap above so a buffered run can't exceed the + // limit the trigger validator applies at creation. + // + // Dedup the success-count off the pre-mutation entry (already + // fetched by mutateWithFallback's env-auth pre-check, so no extra + // Redis read) so the message reports the same `newTags.length` the + // PG path reports — not the pre-dedup request count, which would + // give an inconsistent number across the buffered/materialised + // boundary for the same input. + synthesisedResponse: ({ bufferEntry }) => { + const existing = parseSnapshotTags(bufferEntry); + const newTagsCount = existing + ? nonEmptyTags.filter((t) => !existing.includes(t)).length + : nonEmptyTags.length; + return json( + { message: `Successfully set ${newTagsCount} new tags.` }, + { status: 200 } + ); }, + // Buffer rejected the append because it would exceed the cap. We + // don't know the exact deduped overflow count here (the Lua does), + // so report the limit rather than a precise "trying to set N". + rejectedResponse: () => + json( + { error: `Runs can only have ${MAX_TAGS_PER_RUN} tags.` }, + { status: 422 } + ), + abortSignal: getRequestAbortSignal(), }); - return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 }); + if (outcome.kind === "not_found") { + return json({ error: "Run not found" }, { status: 404 }); + } + if (outcome.kind === "timed_out") { + return json({ error: "Run materialisation timed out" }, { status: 503 }); + } + return outcome.response; } catch (error) { logger.error("Failed to add run tags", { error }); return json({ error: "Something went wrong, please try again." }, { status: 500 }); diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts index 72ad202467..4bb5922997 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts @@ -1,10 +1,12 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime"; import { json } from "@remix-run/server-runtime"; +import type { TaskRun } from "@trigger.dev/database"; import { z } from "zod"; import { prisma } from "~/db.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { sanitizeTriggerSource } from "~/utils/triggerSource"; const ParamsSchema = z.object({ @@ -12,6 +14,39 @@ const ParamsSchema = z.object({ runParam: z.string(), }); +// Subset of TaskRun fields that ReplayTaskRunService.call actually +// reads from `existingTaskRun`. Validate the buffered fallback against +// this before casting to TaskRun so a buffer-format drift surfaces as a +// 404/422 here rather than as a silent NaN/undefined deep inside +// replay. The full TaskRun type has many more fields the service never +// touches; we only assert the ones it reads. +const BufferedReplayInputSchema = z.object({ + id: z.string(), + friendlyId: z.string(), + runtimeEnvironmentId: z.string(), + taskIdentifier: z.string(), + payload: z.string(), + payloadType: z.string(), + queue: z.string(), + isTest: z.boolean(), + traceId: z.string(), + spanId: z.string(), + engine: z.string(), + runTags: z.array(z.string()), + // Nullable / optional fields the service tolerates via `??` fallbacks. + concurrencyKey: z.string().nullable().optional(), + workerQueue: z.string().nullable().optional(), + machinePreset: z.string().nullable().optional(), + realtimeStreamsVersion: z.string().nullable().optional(), + // ReplayTaskRunService.getExistingMetadata reads these to preserve + // the original run's metadata on replay. Without them in the schema + // they'd be stripped by Zod's default key-passthrough behaviour, and + // a buffered-source replay would silently lose metadata that a + // PG-source replay carries over. + seedMetadata: z.string().nullable().optional(), + seedMetadataType: z.string().nullable().optional(), +}); + export async function action({ request, params }: ActionFunctionArgs) { // Ensure this is a POST request if (request.method.toUpperCase() !== "POST") { @@ -32,12 +67,57 @@ export async function action({ request, params }: ActionFunctionArgs) { const { runParam } = parsed.data; try { - const taskRun = await prisma.taskRun.findUnique({ + const env = authenticationResult.environment; + // PG-first. Replay works on any status per audit — no + // filter beyond friendlyId is the existing semantic; findFirst with + // env scoping tightens it minimally without changing behaviour for + // a correctly-authed caller. + let taskRun: TaskRun | null = await prisma.taskRun.findFirst({ where: { friendlyId: runParam, + runtimeEnvironmentId: env.id, }, }); + if (!taskRun) { + // Buffered fallback. SyntheticRun carries every field + // ReplayTaskRunService reads from a TaskRun. Validate the subset of + // fields the service consumes (BufferedReplayInputSchema above) + // before casting; a schema mismatch surfaces as a 404 here rather + // than as a silent undefined deep inside the service. + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (buffered) { + const parsed = BufferedReplayInputSchema.safeParse(buffered); + if (parsed.success) { + // Manual sync point: `BufferedReplayInputSchema` covers only + // the subset of `TaskRun` fields `ReplayTaskRunService.call` + // currently reads from `existingTaskRun`. The cast is `as + // unknown as TaskRun` because the full `TaskRun` type carries + // ~40 fields the service never touches; mirroring all of them + // on a synthetic snapshot would be misleading. If a future + // change to `ReplayTaskRunService` reads an additional + // `existingTaskRun` field, **add it to the schema above** — + // otherwise the buffered path will silently feed the service + // `undefined` for that field while the PG-source replay + // works. The `safeParse` + warn-log + 404 below is the + // run-time fail-safe; this comment is the design fail-safe. + taskRun = parsed.data as unknown as TaskRun; + } else { + logger.warn("replay: buffered fallback failed schema validation", { + runParam, + issues: parsed.error.issues.map((issue) => ({ + path: issue.path.join("."), + code: issue.code, + })), + }); + } + } + } + if (!taskRun) { return json({ error: "Run not found" }, { status: 404 }); } diff --git a/apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts b/apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts index 0ac8aec835..fb3db6a34e 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts @@ -3,90 +3,137 @@ import { json } from "@remix-run/server-runtime"; import { RescheduleRunRequestBody } from "@trigger.dev/core/v3/schemas"; import { z } from "zod"; import { getApiVersion } from "~/api/versions"; -import { prisma } from "~/db.server"; import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { RescheduleTaskRunService } from "~/v3/services/rescheduleTaskRun.server"; +import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { parseDelay } from "~/utils/delays"; const ParamsSchema = z.object({ runParam: z.string(), }); export async function action({ request, params }: ActionFunctionArgs) { - // Ensure this is a POST request if (request.method.toUpperCase() !== "POST") { return { status: 405, body: "Method Not Allowed" }; } - // Authenticate the request const authenticationResult = await authenticateApiRequest(request); - if (!authenticationResult) { return json({ error: "Invalid or missing API Key" }, { status: 401 }); } const parsed = ParamsSchema.safeParse(params); - if (!parsed.success) { return json({ error: "Invalid or missing run ID" }, { status: 400 }); } - const { runParam } = parsed.data; - - const taskRun = await prisma.taskRun.findUnique({ - where: { - friendlyId: runParam, - runtimeEnvironmentId: authenticationResult.environment.id, - }, - }); - - if (!taskRun) { - return json({ error: "Run not found" }, { status: 404 }); - } - const anyBody = await request.json(); - const body = RescheduleRunRequestBody.safeParse(anyBody); - if (!body.success) { return json({ error: "Invalid request body" }, { status: 400 }); } - const service = new RescheduleTaskRunService(); + const env = authenticationResult.environment; + // Pre-resolve the absolute Date the buffer snapshot should encode. + // RescheduleTaskRunService expects this to be present on the body for + // its PG-side flow; for the buffer-side patch we encode the same + // wall-clock value so the drainer's engine.trigger sees the intended + // delayUntil after materialisation. + const delayUntil = await parseDelay(body.data.delay); + if (!delayUntil) { + return json({ error: "Invalid delay value" }, { status: 400 }); + } try { - const updatedRun = await service.call(taskRun, body.data); - - if (!updatedRun) { - return json({ error: "An unknown error occurred" }, { status: 500 }); + // PG-side `RescheduleTaskRunService.call` enforces + // `taskRun.status !== "DELAYED"` and 422s otherwise — without an + // equivalent guard the buffer path would happily inject a + // `delayUntil` into the snapshot of a non-delayed buffered run, and + // the drainer would materialise it with an unintended delay. The + // SyntheticRun type doesn't carry a "DELAYED" enum value because + // it's not a terminal status the trace API needs to express; the + // buffered analogue is `delayUntil` set in the snapshot. Gate on + // that. Race window between read and write is bounded: if the + // drainer materialises mid-call, mutateWithFallback falls through + // to the PG mutation which has its own DELAYED check. + const buffered = await findRunByIdWithMollifierFallback({ + runId: parsed.data.runParam, + environmentId: env.id, + organizationId: env.organizationId, + }); + if (buffered && !buffered.delayUntil) { + return json( + { error: "Cannot reschedule a run that is not delayed" }, + { status: 422 }, + ); } - const run = await ApiRetrieveRunPresenter.findRun( - updatedRun.friendlyId, - authenticationResult.environment - ); - - if (!run) { + const outcome = await mutateWithFallback({ + runId: parsed.data.runParam, + environmentId: env.id, + organizationId: env.organizationId, + bufferPatch: { + type: "set_delay", + delayUntil: delayUntil.toISOString(), + }, + pgMutation: async (taskRun) => { + const service = new RescheduleTaskRunService(); + const updatedRun = await service.call(taskRun, body.data); + if (!updatedRun) { + return json({ error: "An unknown error occurred" }, { status: 500 }); + } + + const run = await ApiRetrieveRunPresenter.findRun(updatedRun.friendlyId, env); + if (!run) { + return json({ error: "Run not found" }, { status: 404 }); + } + const apiVersion = getApiVersion(request); + const presenter = new ApiRetrieveRunPresenter(apiVersion); + const result = await presenter.call(run, env); + if (!result) { + return json({ error: "Run not found" }, { status: 404 }); + } + return json(result); + }, + // Buffered snapshot has been patched. Run it through the same + // ApiRetrieveRunPresenter the PG branch uses (it falls back to + // the buffer for the SyntheticRun lookup) so the response shape + // matches `RetrieveRunResponse` — that's what the SDK's + // `rescheduleRun` zod-validates against. Returning a stripped + // `{ id, delayUntil }` object fails the SDK schema on every + // existing SDK version. + synthesisedResponse: async () => { + const run = await ApiRetrieveRunPresenter.findRun(parsed.data.runParam, env); + if (!run) { + return json({ error: "Run not found" }, { status: 404 }); + } + const apiVersion = getApiVersion(request); + const presenter = new ApiRetrieveRunPresenter(apiVersion); + const result = await presenter.call(run, env); + if (!result) { + return json({ error: "Run not found" }, { status: 404 }); + } + return json(result); + }, + abortSignal: getRequestAbortSignal(), + }); + + if (outcome.kind === "not_found") { return json({ error: "Run not found" }, { status: 404 }); } - - const apiVersion = getApiVersion(request); - - const presenter = new ApiRetrieveRunPresenter(apiVersion); - const result = await presenter.call(run, authenticationResult.environment); - - if (!result) { - return json({ error: "Run not found" }, { status: 404 }); + if (outcome.kind === "timed_out") { + return json({ error: "Run materialisation timed out" }, { status: 503 }); } - - return json(result); + return outcome.response; } catch (error) { if (error instanceof ServiceValidationError) { return json({ error: error.message }, { status: 400 }); } - logger.error("Failed to reschedule run", { error }); return json({ error: "Something went wrong, please try again." }, { status: 500 }); } diff --git a/apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts b/apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts index a636ca0cc1..f02b058b27 100644 --- a/apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts +++ b/apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts @@ -1,8 +1,13 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; -import { $replica } from "~/db.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server"; +import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; +import { + resolveRunForMutation, + type ResolvedRunForMutation, +} from "~/v3/mollifier/resolveRunForMutation.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -17,29 +22,55 @@ const { action } = createActionApiRoute( action: "write", resource: (params) => ({ type: "runs", id: params.runParam }), }, - findResource: async (params, auth) => { - return $replica.taskRun.findFirst({ - where: { - friendlyId: params.runParam, - runtimeEnvironmentId: auth.environment.id, - }, - }); - }, + // PG-or-buffer resolver. Returning null here would 404 BEFORE the + // action runs (`apiBuilder.server.ts:321`), so buffered cancels need + // a buffer check at this layer too. Logic lives in a helper so the + // three paths (PG hit, buffer hit, both miss) are unit-tested + // independently of the route builder. The action's mutateWithFallback + // call repeats the lookup atomically — slightly redundant but keeps + // wait-and-bounce semantics intact. + findResource: async (params, auth): Promise => + resolveRunForMutation({ + runParam: params.runParam, + environmentId: auth.environment.id, + organizationId: auth.environment.organizationId, + }), }, - async ({ resource }) => { - if (!resource) { - return json({ error: "Run not found" }, { status: 404 }); - } + async ({ params, authentication }) => { + const runId = params.runParam; + const env = authentication.environment; + const cancelledAt = new Date(); + const cancelReason = "Canceled by user"; - const service = new CancelTaskRunService(); + const outcome = await mutateWithFallback({ + runId, + environmentId: env.id, + organizationId: env.organizationId, + bufferPatch: { + type: "mark_cancelled", + cancelledAt: cancelledAt.toISOString(), + cancelReason, + }, + pgMutation: async (taskRun) => { + const service = new CancelTaskRunService(); + try { + await service.call(taskRun); + } catch { + return json({ error: "Internal Server Error" }, { status: 500 }); + } + return json({ id: taskRun.friendlyId }, { status: 200 }); + }, + synthesisedResponse: () => json({ id: runId }, { status: 200 }), + abortSignal: getRequestAbortSignal(), + }); - try { - await service.call(resource); - } catch (error) { - return json({ error: "Internal Server Error" }, { status: 500 }); + if (outcome.kind === "not_found") { + return json({ error: "Run not found" }, { status: 404 }); } - - return json({ id: resource.friendlyId }, { status: 200 }); + if (outcome.kind === "timed_out") { + return json({ error: "Run materialisation timed out" }, { status: 503 }); + } + return outcome.response; } ); diff --git a/apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts b/apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts new file mode 100644 index 0000000000..9cc1831cda --- /dev/null +++ b/apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts @@ -0,0 +1,151 @@ +import { applyMetadataOperations } from "@trigger.dev/core/v3"; +import type { FlushedRunMetadata } from "@trigger.dev/core/v3/schemas"; +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; + +export type ApplyMetadataMutationOutcome = + | { kind: "applied"; newMetadata: Record } + | { kind: "not_found" } + | { kind: "busy" } + | { kind: "version_exhausted" } + // Mirrors the PG-side `MetadataTooLargeError` (status 413). Carries + // the limit + observed size so the route can produce a useful body. + | { kind: "metadata_too_large"; maximumSize: number; observedSize: number }; + +// Apply a metadata PUT (body.metadata replace AND/OR body.operations +// deltas) to a buffered run's snapshot. Mirrors the PG-side +// `UpdateMetadataService.#updateRunMetadataWithOperations` retry loop: +// read snapshot → apply operations in JS → CAS-write back with the +// observed `metadataVersion`. Retries on conflict; bounded by +// `maxRetries`. The Lua CAS is the atomicity primitive — concurrent +// callers never lose an increment / append / set. +export async function applyMetadataMutationToBufferedRun(input: { + runId: string; + // Env+org scoping closes a cross-environment write gap on the buffer + // path: the route's PG path is already env-scoped via Prisma filters, + // and this helper now enforces the same isolation before any buffer + // write so a caller authed in env A can't mutate a buffered run that + // belongs to env B. + environmentId: string; + organizationId: string; + // Byte-size cap on the resulting metadata payload, mirroring the + // PG-side `UpdateMetadataService.maximumSize` (sourced from + // `env.TASK_RUN_METADATA_MAXIMUM_SIZE`). Required so the buffer path + // doesn't silently allow writes the PG path would have rejected. + maximumSize: number; + body: Pick; + buffer?: MollifierBuffer | null; + maxRetries?: number; +}): Promise { + const buffer = input.buffer ?? getMollifierBuffer(); + if (!buffer) return { kind: "not_found" }; + + // Default retry budget tuned for buffered-window concurrency. The + // PG-side `UpdateMetadataService` uses 3, which is fine when the only + // writer is the executing task itself. For a buffered run the writers + // are external API callers, and N parallel writers exhaust 3 retries + // quickly under contention. Bumping to 12 covers ~50-way concurrency + // with sub-percent failure probability; the cost is bounded (each + // retry is one Redis Lua call ~1ms). + const maxRetries = input.maxRetries ?? 12; + for (let attempt = 0; attempt <= maxRetries; attempt++) { + const entry = await buffer.getEntry(input.runId); + if (!entry) return { kind: "not_found" }; + // Env+org check: an entry from a different env is treated as a + // miss (not 403) so existence in other envs doesn't leak. + if ( + entry.envId !== input.environmentId || + entry.orgId !== input.organizationId + ) { + return { kind: "not_found" }; + } + if (entry.status !== "QUEUED" || entry.materialised) { + return { kind: "busy" }; + } + + const snapshot = JSON.parse(entry.payload) as Record; + const currentMetadataType = + typeof snapshot.metadataType === "string" ? snapshot.metadataType : "application/json"; + + // Match PG semantics: `body.operations` and `body.metadata` are + // mutually exclusive on a single request. The PG service + // (`UpdateMetadataService.#updateRunMetadata`) branches on + // `Array.isArray(body.operations)` — if operations are present it + // applies them on top of the EXISTING metadata and ignores + // `body.metadata` entirely; otherwise `body.metadata` is the new + // full value. Doing both here would make a request like + // `{ metadata: {b:2}, operations: [set c=3] }` produce + // `{b:2,c:3}` on the buffer vs `{a:1,c:3}` on PG, which silently + // changes semantics across the buffered/materialised boundary. + const parseSnapshotMetadata = (): Record => { + if (typeof snapshot.metadata !== "string") return {}; + try { + return JSON.parse(snapshot.metadata) as Record; + } catch { + return {}; + } + }; + + let metadataObject: Record; + if (input.body.operations?.length) { + // Operations take precedence: apply on top of existing snapshot + // metadata; ignore `body.metadata` to match PG behaviour. + metadataObject = applyMetadataOperations( + parseSnapshotMetadata(), + input.body.operations, + ).newMetadata; + } else if (input.body.metadata !== undefined) { + // No operations — full replace. + metadataObject = input.body.metadata as Record; + } else { + // Neither — write back existing snapshot metadata (no-op shape). + metadataObject = parseSnapshotMetadata(); + } + + const newMetadataStr = JSON.stringify(metadataObject); + + // Size cap — match PG (`handleMetadataPacket` throws + // `MetadataTooLargeError` (413) when the JSON-encoded packet + // exceeds the configured cap). Reject in-loop, before CAS, so a + // single oversize write doesn't churn the retry budget. + const observedSize = Buffer.byteLength(newMetadataStr, "utf8"); + if (observedSize > input.maximumSize) { + return { + kind: "metadata_too_large", + maximumSize: input.maximumSize, + observedSize, + }; + } + + const cas = await buffer.casSetMetadata({ + runId: input.runId, + expectedVersion: entry.metadataVersion, + newMetadata: newMetadataStr, + newMetadataType: currentMetadataType, + }); + + if (cas.kind === "applied") { + return { kind: "applied", newMetadata: metadataObject }; + } + if (cas.kind === "not_found") return { kind: "not_found" }; + if (cas.kind === "busy") return { kind: "busy" }; + // version_conflict — another caller wrote between our read + CAS. + // Small jittered backoff so a thundering herd of N retriers doesn't + // all re-read + re-CAS at exactly the same moment. + logger.debug("applyMetadataMutationToBufferedRun: version_conflict, retrying", { + runId: input.runId, + attempt, + observedVersion: entry.metadataVersion, + currentVersion: cas.currentVersion, + }); + const backoffMs = Math.floor(Math.random() * (5 + attempt * 5)); + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + } + + logger.warn("applyMetadataMutationToBufferedRun: retries exhausted", { + runId: input.runId, + maxRetries, + }); + return { kind: "version_exhausted" }; +} diff --git a/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts b/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts new file mode 100644 index 0000000000..57b0e6498a --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts @@ -0,0 +1,235 @@ +import type { + BufferEntry, + MollifierBuffer, + MutateSnapshotResult, + SnapshotPatch, +} from "@trigger.dev/redis-worker"; +import type { TaskRun } from "@trigger.dev/database"; +import { prisma, $replica } from "~/db.server"; +import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; + +// Wait/retry knobs. Exported for tests. +export const DEFAULT_SAFETY_NET_MS = 2_000; +// Initial gap between buffer polls; grows by BACKOFF_FACTOR up to +// DEFAULT_MAX_POLL_STEP_MS so a slow drain doesn't poll at a tight fixed +// cadence for the whole safety-net budget. +export const DEFAULT_POLL_STEP_MS = 20; +export const DEFAULT_MAX_POLL_STEP_MS = 250; +const BACKOFF_FACTOR = 1.7; + +export type MutateWithFallbackInput = { + runId: string; + environmentId: string; + organizationId: string; + bufferPatch: SnapshotPatch; + // Called when a PG row exists (either replica-hit or post-wait writer-hit). + // Receives the full TaskRun shape and returns the customer-visible body. + pgMutation: (pgRow: TaskRun) => Promise; + // Called when the patch landed cleanly on the buffer snapshot. The + // drainer will see the patched payload on its next pop. Receives the + // pre-mutation snapshot entry (the one fetched for the env auth + // check above) so the caller can compute response details that + // depend on the prior state — e.g. the tags route needs to dedup + // against the existing tags to report an accurate `newTags` count + // matching the PG path, without an extra Redis round-trip. + // `bufferEntry` is `null` in the rare race where the entry didn't + // exist at pre-check time but appeared before `mutateSnapshot`. + synthesisedResponse: (ctx: { + bufferEntry: BufferEntry | null; + }) => TResponse | Promise; + // Called when the buffer rejected the patch as invalid (e.g. an + // `append_tags` patch carrying `maxTags` would exceed the cap). Required + // only by callers that send a rejectable patch; the helper throws if the + // buffer reports a rejection and no builder was supplied. Receives the + // same `bufferEntry` context as `synthesisedResponse` so a rejection + // message can reference the prior state if useful. + rejectedResponse?: (ctx: { + bufferEntry: BufferEntry | null; + }) => TResponse | Promise; + abortSignal?: AbortSignal; + // Override defaults for tests. + safetyNetMs?: number; + pollStepMs?: number; + maxPollStepMs?: number; + // Test injection. + getBuffer?: () => MollifierBuffer | null; + prismaWriter?: TaskRunReader; + prismaReplica?: TaskRunReader; + sleep?: (ms: number) => Promise; + now?: () => number; + // Jitter source; defaults to Math.random. Inject `() => 0` for + // deterministic poll timing in tests. + random?: () => number; +}; + +export type MutateWithFallbackOutcome = + | { kind: "pg"; response: TResponse } + | { kind: "snapshot"; response: TResponse } + | { kind: "rejected"; response: TResponse } + | { kind: "not_found" } + | { kind: "timed_out" }; + +// PG-first → buffer mutateSnapshot → wait-and-bounce. The +// caller decides how to translate the outcome into an HTTP response — +// this helper never throws Response objects so it remains route-agnostic +// and unit-testable in isolation. +export async function mutateWithFallback( + input: MutateWithFallbackInput, +): Promise> { + const replica = input.prismaReplica ?? $replica; + const writer = input.prismaWriter ?? prisma; + const buffer = (input.getBuffer ?? getMollifierBuffer)(); + const sleep = input.sleep ?? defaultSleep; + const now = input.now ?? Date.now; + + // Path 1 — PG is already canonical. + const replicaRow = await findRunInPg(replica, input.runId, input.environmentId); + if (replicaRow) { + const response = await input.pgMutation(replicaRow); + return { kind: "pg", response }; + } + + if (!buffer) { + // No buffer configured (mollifier disabled or boot-time error). PG + // missed; nothing else to consult. + return { kind: "not_found" }; + } + + // Env-scoped authorization for the buffer path. The replica/writer + // lookups above are already env-scoped via findRunInPg; this closes + // the same gap on the buffer side so a caller authed in env A can't + // mutate a buffered run that belongs to env B (or a different org) + // by guessing its friendlyId. Non-atomic w.r.t. the mutateSnapshot + // call below, but the TOCTOU is benign: runIds are globally unique, + // so a cross-env entry can't suddenly appear after a same-env check. + // A genuinely-missing entry (entry === null) falls through and is + // handled by the existing not_found / writer-recovery path below. + const entryForAuth = await buffer.getEntry(input.runId); + if ( + entryForAuth && + (entryForAuth.envId !== input.environmentId || + entryForAuth.orgId !== input.organizationId) + ) { + // Hide existence on env mismatch: return not_found, same shape as + // a true miss, rather than 403 which would leak that the runId + // exists in some other env. + return { kind: "not_found" }; + } + + // Path 2 — buffer snapshot mutation. + const result: MutateSnapshotResult = await buffer.mutateSnapshot( + input.runId, + input.bufferPatch, + ); + + if (result === "applied_to_snapshot") { + return { + kind: "snapshot", + response: await input.synthesisedResponse({ bufferEntry: entryForAuth }), + }; + } + + if (result === "limit_exceeded") { + // The buffer refused the patch (e.g. tag cap). Nothing was written. + // Surface the caller's rejection body; a missing builder means the + // caller sent a rejectable patch without handling the rejection. + if (!input.rejectedResponse) { + throw new Error( + "mutateWithFallback: buffer returned 'limit_exceeded' but no rejectedResponse was provided", + ); + } + return { + kind: "rejected", + response: await input.rejectedResponse({ bufferEntry: entryForAuth }), + }; + } + + if (result === "not_found") { + // Disambiguate a genuine 404 from a replica-lag miss: ask the writer + // directly. If the row just appeared post-drain we route through the + // PG mutation path. + const writerRow = await findRunInPg(writer, input.runId, input.environmentId); + if (writerRow) { + const response = await input.pgMutation(writerRow); + return { kind: "pg", response }; + } + return { kind: "not_found" }; + } + + // result === "busy" — the entry is mid-handoff (DRAINING) or already + // materialised. We do NOT poll the primary for the row to appear: that + // piles read load onto the writer at exactly the moment mollifier exists + // to shed it. Instead we watch the buffer entry itself (cheap Redis + // reads). The drainer writes the PG row BEFORE it acks (sets + // `materialised`) or fails (deletes the entry), so the entry's own state + // is an authoritative, already-in-Redis signal for "is the row in PG + // yet?". Only once it resolves do we touch the primary — exactly once, + // for the real mutation. + const safetyNetMs = input.safetyNetMs ?? DEFAULT_SAFETY_NET_MS; + const maxPollStepMs = input.maxPollStepMs ?? DEFAULT_MAX_POLL_STEP_MS; + const random = input.random ?? Math.random; + const deadline = now() + safetyNetMs; + let step = input.pollStepMs ?? DEFAULT_POLL_STEP_MS; + + while (now() < deadline) { + if (input.abortSignal?.aborted) { + return { kind: "timed_out" }; + } + + const entry = await buffer.getEntry(input.runId); + // Resolved when the entry is gone (`fail` deleted it after writing a + // terminal SYSTEM_FAILURE row) or materialised (`ack` after a + // successful trigger / cancel write). In both cases the PG row is now + // committed on the primary, so read it once and route through the + // canonical PG mutation path. + if (entry === null || entry.materialised === true) { + const row = await findRunInPg(writer, input.runId, input.environmentId); + if (row) { + const response = await input.pgMutation(row); + return { kind: "pg", response }; + } + // Entry gone with no PG row: the drainer's terminal write itself + // failed (PG unreachable). Nothing to mutate. + return { kind: "not_found" }; + } + // Still QUEUED (requeued after a retryable drain error) or DRAINING — + // the run hasn't reached PG. Back off with jitter so concurrent + // waiters on the same draining run don't requery in lockstep. + if (now() >= deadline) break; + const jittered = step + Math.floor(random() * step); + await sleep(jittered); + step = Math.min(Math.ceil(step * BACKOFF_FACTOR), maxPollStepMs); + } + + logger.warn("mollifier mutate-with-fallback: drainer resolution timed out", { + runId: input.runId, + safetyNetMs, + }); + return { kind: "timed_out" }; +} + +// Structural reader interface — accepts both the writer (`prisma`) and the +// replica (`$replica`), which differ slightly in their generated Prisma +// types but share the findFirst surface used here. +type TaskRunReader = { + taskRun: { + findFirst(args: { + where: { friendlyId: string; runtimeEnvironmentId: string }; + }): Promise; + }; +}; + +async function findRunInPg( + client: TaskRunReader, + friendlyId: string, + environmentId: string, +): Promise { + return client.taskRun.findFirst({ + where: { friendlyId, runtimeEnvironmentId: environmentId }, + }); +} + +function defaultSleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/apps/webapp/app/v3/mollifier/resolveRunForMutation.server.ts b/apps/webapp/app/v3/mollifier/resolveRunForMutation.server.ts new file mode 100644 index 0000000000..2808fbe9b2 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/resolveRunForMutation.server.ts @@ -0,0 +1,58 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { $replica as defaultReplica } from "~/db.server"; +import { getMollifierBuffer as defaultGetBuffer } from "./mollifierBuffer.server"; + +// Discriminated-union resolver used by mutation routes' `findResource`. +// The route builder treats a null return from `findResource` as a 404 +// BEFORE the action handler runs (`apiBuilder.server.ts:321`), so we +// must check BOTH the PG canonical store and the mollifier buffer here +// — otherwise a buffered run can't be cancelled / mutated even though +// the underlying mutateWithFallback flow would handle it correctly. +// +// (Regression: before extracting this helper the cancel route had +// `findResource: async () => null`, which made every cancel 404 before +// the action ran. The helper makes the lookup unit-testable.) +export type ResolvedRunForMutation = + | { source: "pg"; friendlyId: string } + | { source: "buffer"; friendlyId: string }; + +export type ResolveRunForMutationDeps = { + prismaReplica?: { + taskRun: { + findFirst(args: { + where: { friendlyId: string; runtimeEnvironmentId: string }; + select: { friendlyId: true }; + }): Promise<{ friendlyId: string } | null>; + }; + }; + getBuffer?: () => MollifierBuffer | null; +}; + +export async function resolveRunForMutation(input: { + runParam: string; + environmentId: string; + organizationId: string; + deps?: ResolveRunForMutationDeps; +}): Promise { + const replica = input.deps?.prismaReplica ?? defaultReplica; + const getBuffer = input.deps?.getBuffer ?? defaultGetBuffer; + + const pgRun = await replica.taskRun.findFirst({ + where: { friendlyId: input.runParam, runtimeEnvironmentId: input.environmentId }, + select: { friendlyId: true }, + }); + if (pgRun) return { source: "pg", friendlyId: pgRun.friendlyId }; + + const buffer = getBuffer(); + if (!buffer) return null; + + const entry = await buffer.getEntry(input.runParam); + if ( + entry && + entry.envId === input.environmentId && + entry.orgId === input.organizationId + ) { + return { source: "buffer", friendlyId: input.runParam }; + } + return null; +} diff --git a/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts b/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts index 9568499930..e7890867ca 100644 --- a/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts +++ b/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts @@ -1,6 +1,7 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; import { logger } from "~/services/logger.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; export class ResetIdempotencyKeyService extends BaseService { public async call( @@ -8,7 +9,7 @@ export class ResetIdempotencyKeyService extends BaseService { taskIdentifier: string, authenticatedEnv: AuthenticatedEnvironment ): Promise<{ id: string }> { - const { count } = await this._prisma.taskRun.updateMany({ + const { count: pgCount } = await this._prisma.taskRun.updateMany({ where: { idempotencyKey, taskIdentifier, @@ -20,7 +21,48 @@ export class ResetIdempotencyKeyService extends BaseService { }, }); - if (count === 0) { + // Buffer-side reset: the key may belong to a buffered run that + // hasn't materialised yet. The PG updateMany above can't see it. + // resetIdempotency clears both the snapshot fields and the Redis + // lookup atomically. Returns null when nothing was bound there. + const buffer = getMollifierBuffer(); + let bufferResetFailed = false; + const bufferResult = buffer + ? await buffer + .resetIdempotency({ + envId: authenticatedEnv.id, + taskIdentifier, + idempotencyKey, + }) + .catch((err) => { + // Don't drop a buffer outage on the floor. We log + flag so + // the 404 branch below can distinguish "no record anywhere" + // (legitimate not-found) from "PG cleared nothing AND we + // couldn't see the buffer" (partial outage — caller should + // retry, not be told "doesn't exist"). + bufferResetFailed = true; + logger.error("ResetIdempotencyKeyService: buffer reset failed", { + idempotencyKey, + taskIdentifier, + err: err instanceof Error ? err.message : String(err), + }); + return { clearedRunId: null }; + }) + : { clearedRunId: null }; + + const totalCount = pgCount + (bufferResult.clearedRunId ? 1 : 0); + + if (pgCount === 0 && bufferResetFailed) { + // PG saw nothing AND the buffer is unreachable. We can't truthfully + // say "not found" — there may be a buffered run we can't observe. + // Surface as 503 so the caller retries instead of being misled. + throw new ServiceValidationError( + "Unable to verify buffered idempotency state right now; please retry", + 503 + ); + } + + if (totalCount === 0) { throw new ServiceValidationError( `No runs found with idempotency key: ${idempotencyKey} and task: ${taskIdentifier}`, 404 @@ -28,7 +70,7 @@ export class ResetIdempotencyKeyService extends BaseService { } logger.info( - `Reset idempotency key: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${count} run(s)` + `Reset idempotency key: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${totalCount} run(s) (pg=${pgCount}, buffered=${bufferResult.clearedRunId ? 1 : 0})` ); return { id: idempotencyKey }; diff --git a/apps/webapp/test/mollifierApplyMetadataMutation.test.ts b/apps/webapp/test/mollifierApplyMetadataMutation.test.ts new file mode 100644 index 0000000000..74920ad66b --- /dev/null +++ b/apps/webapp/test/mollifierApplyMetadataMutation.test.ts @@ -0,0 +1,290 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server"; +import type { BufferEntry, MollifierBuffer, CasSetMetadataResult } from "@trigger.dev/redis-worker"; + +// Regression for a CAS retry-exhaustion bug: the default `maxRetries` +// was 3, matching the PG-side service, but that exhausts fast when N +// external API writers race the same buffered run's metadata. Bumped +// to 12 + jittered backoff. These tests simulate version_conflict +// races and assert (a) every delta lands and (b) the retry budget is +// sized for realistic concurrency. + +const NOW = new Date("2026-05-21T10:00:00Z"); + +type BufferStub = { + buffer: MollifierBuffer; + state: { + version: number; + metadata: Record; + pendingConflictsForNextN: number; + }; +}; + +// Build a stub MollifierBuffer that simulates Lua-CAS semantics +// in-memory. The first `pendingConflictsForNextN` casSetMetadata calls +// from any worker will return version_conflict (then the version +// bumps); subsequent calls succeed. +function makeBufferStub(initialPayload: Record = {}): BufferStub { + const state = { + version: 0, + metadata: initialPayload.metadata + ? (JSON.parse(initialPayload.metadata as string) as Record) + : {}, + pendingConflictsForNextN: 0, + }; + const entryTemplate: Omit = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + status: "QUEUED", + attempts: 0, + createdAt: NOW, + createdAtMicros: 1747044000000000, + materialised: false, + idempotencyLookupKey: "", + metadataVersion: 0, + }; + + const buffer: MollifierBuffer = { + getEntry: vi.fn(async (): Promise => ({ + ...entryTemplate, + metadataVersion: state.version, + payload: JSON.stringify({ ...initialPayload, metadata: JSON.stringify(state.metadata) }), + })), + casSetMetadata: vi.fn( + async (input: { + runId: string; + expectedVersion: number; + newMetadata: string; + newMetadataType: string; + }): Promise => { + // Inject a controlled number of conflicts to simulate races. + if (state.pendingConflictsForNextN > 0) { + state.pendingConflictsForNextN -= 1; + // Bump version as if some other writer just landed. + state.version += 1; + return { kind: "version_conflict", currentVersion: state.version }; + } + if (input.expectedVersion !== state.version) { + return { kind: "version_conflict", currentVersion: state.version }; + } + state.metadata = JSON.parse(input.newMetadata) as Record; + state.version += 1; + return { kind: "applied", newVersion: state.version }; + }, + ), + } as unknown as MollifierBuffer; + + return { buffer, state }; +} + +describe("applyMetadataMutationToBufferedRun — retry behaviour", () => { + it("succeeds when CAS lands on the first try (no contention)", async () => { + const { buffer, state } = makeBufferStub(); + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + maximumSize: 1024 * 1024, + body: { metadata: { counter: 1 } }, + buffer, + }); + expect(result.kind).toBe("applied"); + expect(state.metadata).toEqual({ counter: 1 }); + expect(state.version).toBe(1); + }); + + it("succeeds after 5 version conflicts (default budget = 12)", async () => { + const { buffer, state } = makeBufferStub(); + state.pendingConflictsForNextN = 5; + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + maximumSize: 1024 * 1024, + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer, + }); + expect(result.kind).toBe("applied"); + if (result.kind === "applied") { + expect(result.newMetadata.counter).toBe(1); + } + }); + + it("succeeds after 11 version conflicts (one under the default budget)", async () => { + const { buffer } = makeBufferStub(); + const setStateConflicts = (n: number) => { + // Re-read state from the closure + const state = (buffer as unknown as { __state__?: never; getEntry: () => Promise }); + void state; + }; + void setStateConflicts; + // Set conflicts directly via the shared state object + const { state } = makeBufferStub(); + state.pendingConflictsForNextN = 11; + // Build a fresh stub since we want one shared state instance + const stub = makeBufferStub(); + stub.state.pendingConflictsForNextN = 11; + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + maximumSize: 1024 * 1024, + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer: stub.buffer, + }); + expect(result.kind).toBe("applied"); + }); + + it("returns version_exhausted after retries are spent", async () => { + const stub = makeBufferStub(); + // 99 conflicts ≫ default budget of 12. With maxRetries 3 (the + // pre-fix value), this would have exhausted after 4 attempts. + stub.state.pendingConflictsForNextN = 99; + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + maximumSize: 1024 * 1024, + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer: stub.buffer, + maxRetries: 12, + }); + expect(result.kind).toBe("version_exhausted"); + }); + + it("regression: 3 retries are NOT enough under 50-way concurrency simulation", async () => { + // The pre-fix default would have lost most deltas under this + // contention. Asserting that the OLD budget (3) exhausts confirms + // the regression actually existed and the new budget addresses it. + const stub = makeBufferStub(); + stub.state.pendingConflictsForNextN = 8; + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + maximumSize: 1024 * 1024, + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer: stub.buffer, + maxRetries: 3, + }); + expect(result.kind).toBe("version_exhausted"); + }); + + it("matches PG semantics when body has both metadata + operations: ops on top of EXISTING, body.metadata ignored", async () => { + // PG service (UpdateMetadataService.#updateRunMetadata) branches on + // Array.isArray(body.operations) — when present it applies ops on + // top of existing PG metadata and IGNORES body.metadata. The buffer + // helper used to merge both (replace then apply), producing different + // results across the buffered/materialised boundary. This regression + // pins the PG-matching behaviour. + const stub = makeBufferStub({ metadata: JSON.stringify({ a: 1 }) }); + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + maximumSize: 1024 * 1024, + body: { + // Should be ignored because `operations` is also present. + metadata: { b: 2 }, + operations: [{ type: "set", key: "c", value: 3 }], + }, + buffer: stub.buffer, + }); + expect(result.kind).toBe("applied"); + if (result.kind === "applied") { + // PG would produce {a:1, c:3}; previously the buffer produced {b:2, c:3}. + expect(result.newMetadata).toEqual({ a: 1, c: 3 }); + expect(result.newMetadata).not.toHaveProperty("b"); + } + }); + + it("returns metadata_too_large when the resulting payload exceeds maximumSize (mirrors PG 413)", async () => { + // PG-side `UpdateMetadataService` uses `handleMetadataPacket` to + // enforce TASK_RUN_METADATA_MAXIMUM_SIZE (default 256KB), throwing + // `MetadataTooLargeError` (413) on overflow. The buffer helper now + // matches that cap so a buffered run can't accept a payload PG + // would have rejected. Reject must fire BEFORE casSetMetadata. + const stub = makeBufferStub(); + const big = "x".repeat(2048); // 2 KB string value + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + maximumSize: 1024, // 1 KB cap — strictly less than the payload + body: { metadata: { big } }, + buffer: stub.buffer, + }); + expect(result.kind).toBe("metadata_too_large"); + if (result.kind === "metadata_too_large") { + expect(result.maximumSize).toBe(1024); + expect(result.observedSize).toBeGreaterThan(1024); + } + // No CAS write should have been attempted. + expect(stub.buffer.casSetMetadata).not.toHaveBeenCalled(); + expect(stub.state.version).toBe(0); + }); + + it("returns not_found when the buffered entry belongs to a different env (cross-env auth gate)", async () => { + // Same shape as a normal apply call, but the caller's environmentId + // doesn't match the entry's envId. The helper must refuse the + // mutation and return not_found (without leaking existence) and + // must NOT call casSetMetadata. + const stub = makeBufferStub(); + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_OTHER", + organizationId: "org_1", + maximumSize: 1024 * 1024, + body: { metadata: { counter: 1 } }, + buffer: stub.buffer, + }); + expect(result.kind).toBe("not_found"); + expect(stub.buffer.casSetMetadata).not.toHaveBeenCalled(); + expect(stub.state.version).toBe(0); + }); + + it("returns not_found when the buffered entry belongs to a different org (cross-org auth gate)", async () => { + const stub = makeBufferStub(); + const result = await applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_OTHER", + maximumSize: 1024 * 1024, + body: { metadata: { counter: 1 } }, + buffer: stub.buffer, + }); + expect(result.kind).toBe("not_found"); + expect(stub.buffer.casSetMetadata).not.toHaveBeenCalled(); + }); + + it("N-way concurrent applies all converge under default budget", async () => { + // Simulate N parallel writers against a shared state. Each writer + // reads, applies a delta, CAS-writes. The Lua CAS forces them to + // retry until they see the latest version. + const N = 30; + const sharedStub = makeBufferStub(); + // Override the stub to model real per-attempt serialisation: each + // call reads the latest version, and CAS conflicts are organic + // (not pre-injected) when expectedVersion != current. + sharedStub.state.pendingConflictsForNextN = 0; + + const calls = Array.from({ length: N }, () => + applyMetadataMutationToBufferedRun({ + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + maximumSize: 1024 * 1024, + body: { operations: [{ type: "increment", key: "counter", value: 1 }] }, + buffer: sharedStub.buffer, + }), + ); + const results = await Promise.all(calls); + const applied = results.filter((r) => r.kind === "applied").length; + expect(applied).toBe(N); + expect(sharedStub.state.metadata.counter).toBe(N); + }); +}); diff --git a/apps/webapp/test/mollifierMutateWithFallback.test.ts b/apps/webapp/test/mollifierMutateWithFallback.test.ts new file mode 100644 index 0000000000..500a896d73 --- /dev/null +++ b/apps/webapp/test/mollifierMutateWithFallback.test.ts @@ -0,0 +1,447 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ + prisma: { taskRun: { findFirst: vi.fn(async () => null) } }, + $replica: { taskRun: { findFirst: vi.fn(async () => null) } }, +})); + +import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; +import type { + BufferEntry, + MollifierBuffer, + MutateSnapshotResult, +} from "@trigger.dev/redis-worker"; +import type { TaskRun } from "@trigger.dev/database"; + +type FindFirst = ReturnType; +type PrismaStub = { taskRun: { findFirst: FindFirst } }; + +function fakePrisma(rows: Array): PrismaStub { + const fn = vi.fn(); + for (const r of rows) fn.mockResolvedValueOnce(r); + fn.mockResolvedValue(null); + return { taskRun: { findFirst: fn } }; +} + +// Env-matching entry returned by the env-pre-check getEntry call that +// mutateWithFallback now does before any buffer write (cross-env auth +// gate). Same envId/orgId as `baseInput` so the check passes and the +// flow under test proceeds to mutateSnapshot. +const preCheckEntry = (): BufferEntry => + ({ + envId: "env_a", + orgId: "org_1", + status: "QUEUED", + materialised: false, + }) as unknown as BufferEntry; + +function bufferReturning(result: MutateSnapshotResult): MollifierBuffer { + const getEntry = vi.fn(async () => preCheckEntry()); + return { + mutateSnapshot: vi.fn(async () => result), + getEntry, + } as unknown as MollifierBuffer; +} + +// Buffer whose mutateSnapshot returns "busy" and whose getEntry walks a +// scripted sequence of entry states. The pre-check getEntry call (one +// extra read before the busy-wait loop, used for env authorization) +// consumes the first scripted result, then the busy-wait loop pops the +// remainder; the last element repeats once the sequence is exhausted. +function bufferBusy(entries: Array): MollifierBuffer { + const getEntry = vi.fn(); + // Pre-check consumes one entry. Use a QUEUED env-matching entry so + // the env-check passes and the flow reaches mutateSnapshot (which + // returns "busy") and enters the wait-loop. + getEntry.mockResolvedValueOnce(preCheckEntry()); + for (const e of entries) getEntry.mockResolvedValueOnce(e); + getEntry.mockResolvedValue(entries.length ? entries[entries.length - 1] : null); + return { + mutateSnapshot: vi.fn(async () => "busy" as const), + getEntry, + } as unknown as MollifierBuffer; +} + +const entryDraining = (): BufferEntry => + ({ + envId: "env_a", + orgId: "org_1", + status: "DRAINING", + materialised: false, + }) as unknown as BufferEntry; +const entryQueued = (): BufferEntry => + ({ + envId: "env_a", + orgId: "org_1", + status: "QUEUED", + materialised: false, + }) as unknown as BufferEntry; +const entryMaterialised = (): BufferEntry => + ({ + envId: "env_a", + orgId: "org_1", + status: "DRAINING", + materialised: true, + }) as unknown as BufferEntry; + +const fakeRun = (overrides: Partial = {}): TaskRun => + ({ + id: "pg_id", + friendlyId: "run_1", + runtimeEnvironmentId: "env_a", + ...overrides, + }) as TaskRun; + +const baseInput = { + runId: "run_1", + environmentId: "env_a", + organizationId: "org_1", + bufferPatch: { type: "append_tags" as const, tags: ["x"] }, +}; + +describe("mutateWithFallback", () => { + it("hits replica → calls pgMutation, returns pg outcome", async () => { + const row = fakeRun(); + const pgMutation = vi.fn(async () => "pg-response"); + const synthesisedResponse = vi.fn(() => "snapshot-response"); + + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse, + prismaReplica: fakePrisma([row]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("applied_to_snapshot"), + }); + + expect(result).toEqual({ kind: "pg", response: "pg-response" }); + expect(pgMutation).toHaveBeenCalledWith(row); + expect(synthesisedResponse).not.toHaveBeenCalled(); + }); + + it("replica miss + buffer applied_to_snapshot → synthesisedResponse", async () => { + const pgMutation = vi.fn(async () => "pg"); + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("applied_to_snapshot"), + }); + expect(result).toEqual({ kind: "snapshot", response: "snap" }); + expect(pgMutation).not.toHaveBeenCalled(); + }); + + it("applied_to_snapshot forwards the pre-mutation entry to synthesisedResponse (lets callers dedup)", async () => { + // The tags route uses this to compute the same post-dedup count + // the PG path reports, without an extra Redis round-trip. + const synthesised = vi.fn(({ bufferEntry }: { bufferEntry: BufferEntry | null }) => { + // Caller can inspect bufferEntry.payload (or other fields) to + // produce a response that depends on the prior snapshot state. + return bufferEntry ? "snap-with-entry" : "snap-without-entry"; + }); + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: synthesised, + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("applied_to_snapshot"), + }); + expect(result).toEqual({ kind: "snapshot", response: "snap-with-entry" }); + expect(synthesised).toHaveBeenCalledTimes(1); + const ctx = synthesised.mock.calls[0]?.[0]; + expect(ctx?.bufferEntry).not.toBeNull(); + // The pre-check entry has the env-matching shape set up by + // bufferReturning() / preCheckEntry(). + expect(ctx?.bufferEntry?.envId).toBe("env_a"); + expect(ctx?.bufferEntry?.orgId).toBe("org_1"); + }); + + it("replica miss + buffer not_found + writer miss → not_found", async () => { + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([null]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("not_found"), + }); + expect(result).toEqual({ kind: "not_found" }); + }); + + it("replica miss + buffer not_found + writer hit → pgMutation (replica-lag recovery)", async () => { + const row = fakeRun({ friendlyId: "run_1" }); + const pgMutation = vi.fn(async () => "pg-recovered"); + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([row]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("not_found"), + }); + expect(result).toEqual({ kind: "pg", response: "pg-recovered" }); + expect(pgMutation).toHaveBeenCalledWith(row); + }); + + it("busy → watches buffer through DRAINING, materialises, hits primary exactly once", async () => { + const row = fakeRun(); + const pgMutation = vi.fn(async () => "pg-after-wait"); + // Writer is read ONCE, only after the buffer reports materialised. + const writer = fakePrisma([row]); + const buffer = bufferBusy([entryDraining(), entryDraining(), entryMaterialised()]); + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: writer as unknown as typeof import("~/db.server").prisma, + getBuffer: () => buffer, + sleep: async (ms) => { + nowValue += ms; + }, + now: () => nowValue, + safetyNetMs: 2000, + pollStepMs: 20, + random: () => 0, + }); + expect(result).toEqual({ kind: "pg", response: "pg-after-wait" }); + expect(pgMutation).toHaveBeenCalledWith(row); + // One env-pre-check call + 3 busy-wait polls = 4 getEntry reads; + // primary read exactly once. + expect(buffer.getEntry).toHaveBeenCalledTimes(4); + expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1); + }); + + it("busy → entry deleted by terminal fail, writer finds SYSTEM_FAILURE row → pgMutation", async () => { + const row = fakeRun(); + const pgMutation = vi.fn(async () => "pg-failed-row"); + const writer = fakePrisma([row]); + const buffer = bufferBusy([entryDraining(), null]); + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: writer as unknown as typeof import("~/db.server").prisma, + getBuffer: () => buffer, + sleep: async (ms) => { + nowValue += ms; + }, + now: () => nowValue, + safetyNetMs: 2000, + pollStepMs: 20, + random: () => 0, + }); + expect(result).toEqual({ kind: "pg", response: "pg-failed-row" }); + expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1); + }); + + it("busy → entry deleted but no PG row (terminal write failed) → not_found", async () => { + const buffer = bufferBusy([null]); + const writer = fakePrisma([null]); + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: writer as unknown as typeof import("~/db.server").prisma, + getBuffer: () => buffer, + sleep: async (ms) => { + nowValue += ms; + }, + now: () => nowValue, + safetyNetMs: 2000, + pollStepMs: 20, + random: () => 0, + }); + expect(result).toEqual({ kind: "not_found" }); + expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1); + }); + + it("busy → requeued (back to QUEUED) then materialises; doesn't resolve early", async () => { + const row = fakeRun(); + const pgMutation = vi.fn(async () => "pg-after-requeue"); + const writer = fakePrisma([row]); + // QUEUED (requeued after a retryable drain error) must NOT be treated + // as "done" — the run hasn't reached PG. Only the later materialise does. + const buffer = bufferBusy([entryQueued(), entryDraining(), entryMaterialised()]); + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: writer as unknown as typeof import("~/db.server").prisma, + getBuffer: () => buffer, + sleep: async (ms) => { + nowValue += ms; + }, + now: () => nowValue, + safetyNetMs: 2000, + pollStepMs: 20, + random: () => 0, + }); + expect(result).toEqual({ kind: "pg", response: "pg-after-requeue" }); + // One env-pre-check + 3 busy-wait polls. + expect(buffer.getEntry).toHaveBeenCalledTimes(4); + expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1); + }); + + it("busy → drainer never resolves (stays DRAINING) → timed_out, primary never touched", async () => { + const writer = fakePrisma([]); + const buffer = bufferBusy([entryDraining()]); + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: writer as unknown as typeof import("~/db.server").prisma, + getBuffer: () => buffer, + sleep: async (ms) => { + nowValue += ms; + }, + now: () => nowValue, + safetyNetMs: 100, + pollStepMs: 20, + random: () => 0, + }); + expect(result).toEqual({ kind: "timed_out" }); + // The whole point: while the run is still draining we never read the primary. + expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(0); + }); + + it("abort signal during wait → timed_out without further polls", async () => { + const writer = fakePrisma([]); + const buffer = bufferBusy([entryDraining(), entryDraining()]); + const controller = new AbortController(); + let nowValue = 0; + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: writer as unknown as typeof import("~/db.server").prisma, + getBuffer: () => buffer, + sleep: async (ms) => { + nowValue += ms; + controller.abort(); + }, + now: () => nowValue, + safetyNetMs: 2000, + pollStepMs: 20, + random: () => 0, + abortSignal: controller.signal, + }); + expect(result).toEqual({ kind: "timed_out" }); + // One env-pre-check + one busy-wait poll before sleep+abort; primary untouched. + expect(buffer.getEntry).toHaveBeenCalledTimes(2); + expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(0); + }); + + it("replica miss + buffer limit_exceeded → rejected via rejectedResponse builder", async () => { + const pgMutation = vi.fn(async () => "pg"); + const synthesisedResponse = vi.fn(() => "snap"); + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse, + rejectedResponse: () => "too-many-tags", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("limit_exceeded"), + }); + expect(result).toEqual({ kind: "rejected", response: "too-many-tags" }); + expect(pgMutation).not.toHaveBeenCalled(); + expect(synthesisedResponse).not.toHaveBeenCalled(); + }); + + it("buffer limit_exceeded without a rejectedResponse builder → throws (programmer error)", async () => { + await expect( + mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("limit_exceeded"), + }) + ).rejects.toThrow(/limit_exceeded/); + }); + + it("replica miss + buffer entry belongs to a different env → not_found (cross-env auth gate)", async () => { + // Same flow as the applied_to_snapshot test, except the entry's + // envId doesn't match input.environmentId. mutateWithFallback must + // refuse the write and return not_found (without leaking that the + // runId exists in another env), and must NOT call mutateSnapshot. + const crossEnvEntry: BufferEntry = { + envId: "env_OTHER", + orgId: "org_1", + status: "QUEUED", + materialised: false, + } as unknown as BufferEntry; + const mutateSnapshot = vi.fn(async () => "applied_to_snapshot" as const); + const buffer = { + mutateSnapshot, + getEntry: vi.fn(async () => crossEnvEntry), + } as unknown as MollifierBuffer; + + const pgMutation = vi.fn(async () => "pg"); + const synthesisedResponse = vi.fn(() => "snap"); + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse, + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => buffer, + }); + expect(result).toEqual({ kind: "not_found" }); + expect(mutateSnapshot).not.toHaveBeenCalled(); + expect(pgMutation).not.toHaveBeenCalled(); + expect(synthesisedResponse).not.toHaveBeenCalled(); + }); + + it("replica miss + buffer entry belongs to a different org → not_found (cross-org auth gate)", async () => { + const crossOrgEntry: BufferEntry = { + envId: "env_a", + orgId: "org_OTHER", + status: "QUEUED", + materialised: false, + } as unknown as BufferEntry; + const mutateSnapshot = vi.fn(async () => "applied_to_snapshot" as const); + const buffer = { + mutateSnapshot, + getEntry: vi.fn(async () => crossOrgEntry), + } as unknown as MollifierBuffer; + + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => buffer, + }); + expect(result).toEqual({ kind: "not_found" }); + expect(mutateSnapshot).not.toHaveBeenCalled(); + }); + + it("buffer is null (mollifier disabled) → not_found after replica miss", async () => { + const result = await mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => null, + }); + expect(result).toEqual({ kind: "not_found" }); + }); +}); diff --git a/apps/webapp/test/mollifierResetIdempotencyKey.test.ts b/apps/webapp/test/mollifierResetIdempotencyKey.test.ts new file mode 100644 index 0000000000..2fd61e1eab --- /dev/null +++ b/apps/webapp/test/mollifierResetIdempotencyKey.test.ts @@ -0,0 +1,109 @@ +import { describe, expect, it, vi } from "vitest"; + +// Mock the db module so the BaseService default prisma doesn't try to +// open a real connection at module load. Each test wires its own +// prisma stub. +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); +// Prevent the runEngine singleton from instantiating and spinning up +// PG/Redis workers at module load — without this CI fails with +// unhandled `PrismaClientInitializationError`s even though the +// assertions all pass (see `mollifierDrainerWorker.test.ts`). +vi.mock("~/v3/runEngine.server", () => ({ engine: {} })); + +// Hoisted mock state so we can swap the buffer per test without +// re-importing modules. +const bufferMock: { current: unknown } = { current: null }; +vi.mock("~/v3/mollifier/mollifierBuffer.server", () => ({ + getMollifierBuffer: () => bufferMock.current, +})); + +import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; + +type FakePrisma = { + taskRun: { updateMany: (...args: unknown[]) => Promise<{ count: number }> }; +}; + +function makePrisma(pgCount: number): FakePrisma { + return { + taskRun: { + updateMany: vi.fn(async () => ({ count: pgCount })), + }, + }; +} + +const env = { + id: "env_a", + organizationId: "org_1", +} as unknown as Parameters[2]; + +describe("ResetIdempotencyKeyService — buffer-outage handling", () => { + it("returns success when PG cleared >=1 run, even if the buffer reset throws", async () => { + bufferMock.current = { + resetIdempotency: vi.fn(async () => { + throw new Error("ECONNREFUSED"); + }), + }; + const prisma = makePrisma(1); + const service = new ResetIdempotencyKeyService(prisma as never); + + const result = await service.call("ikey", "task", env); + expect(result).toEqual({ id: "ikey" }); + }); + + it("returns success when PG cleared nothing but the buffer cleared a run", async () => { + bufferMock.current = { + resetIdempotency: vi.fn(async () => ({ clearedRunId: "run_x" })), + }; + const prisma = makePrisma(0); + const service = new ResetIdempotencyKeyService(prisma as never); + + const result = await service.call("ikey", "task", env); + expect(result).toEqual({ id: "ikey" }); + }); + + it("404s when PG and buffer both legitimately report 'nothing to clear'", async () => { + bufferMock.current = { + resetIdempotency: vi.fn(async () => ({ clearedRunId: null })), + }; + const prisma = makePrisma(0); + const service = new ResetIdempotencyKeyService(prisma as never); + + await expect(service.call("ikey", "task", env)).rejects.toMatchObject({ + status: 404, + }); + }); + + // Regression for the silent-not-found hazard CodeRabbit flagged: if PG + // sees nothing AND we can't read the buffer (Redis outage), the + // previous behaviour was to 404 — masking a partial outage and + // leaving a buffered key effectively un-reset while the caller was + // told "doesn't exist." We now surface 503 so the caller retries. + it("503s when PG cleared nothing AND the buffer reset failed (partial outage)", async () => { + bufferMock.current = { + resetIdempotency: vi.fn(async () => { + throw new Error("ECONNREFUSED"); + }), + }; + const prisma = makePrisma(0); + const service = new ResetIdempotencyKeyService(prisma as never); + + const error = await service.call("ikey", "task", env).then( + () => null, + (err) => err, + ); + expect(error).toBeInstanceOf(ServiceValidationError); + expect(error.status).toBe(503); + expect(error.message).toMatch(/retry/i); + }); + + it("404s normally when buffer is null (mollifier disabled) and PG cleared nothing", async () => { + bufferMock.current = null; + const prisma = makePrisma(0); + const service = new ResetIdempotencyKeyService(prisma as never); + + await expect(service.call("ikey", "task", env)).rejects.toMatchObject({ + status: 404, + }); + }); +}); diff --git a/apps/webapp/test/mollifierResolveRunForMutation.test.ts b/apps/webapp/test/mollifierResolveRunForMutation.test.ts new file mode 100644 index 0000000000..c552a3cd18 --- /dev/null +++ b/apps/webapp/test/mollifierResolveRunForMutation.test.ts @@ -0,0 +1,154 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: { taskRun: { findFirst: vi.fn(async () => null) } }, +})); + +import { resolveRunForMutation } from "~/v3/mollifier/resolveRunForMutation.server"; +import type { BufferEntry, MollifierBuffer } from "@trigger.dev/redis-worker"; + +// Regression coverage for the cancel-route 404 bug (commit b490afe23). +// Before the fix the route had `findResource: async () => null`, which +// caused the route builder to 404 every cancel — including for valid +// PG-row runs — BEFORE the action handler could run. The helper +// resolveRunForMutation has to return a non-null discriminated value +// whenever the run exists in either store. + +const NOW = new Date("2026-05-21T10:00:00Z"); + +function fakeReplica(row: { friendlyId: string } | null) { + return { taskRun: { findFirst: vi.fn(async () => row) } }; +} + +function fakeBuffer(entry: BufferEntry | null): MollifierBuffer { + return { + getEntry: vi.fn(async () => entry), + } as unknown as MollifierBuffer; +} + +const baseInput = { + runParam: "run_1", + environmentId: "env_a", + organizationId: "org_1", +}; + +describe("resolveRunForMutation", () => { + it("returns { source: 'pg' } when the PG row exists", async () => { + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica({ friendlyId: "run_1" }), + getBuffer: () => null, + }, + }); + expect(result).toEqual({ source: "pg", friendlyId: "run_1" }); + }); + + it("returns { source: 'buffer' } when PG misses and the buffer entry matches env+org", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_1", + payload: "{}", + status: "QUEUED", + attempts: 0, + createdAt: NOW, + createdAtMicros: 1747044000000000, + materialised: false, + idempotencyLookupKey: "", + metadataVersion: 0, + }; + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => fakeBuffer(entry), + }, + }); + expect(result).toEqual({ source: "buffer", friendlyId: "run_1" }); + }); + + it("returns null when PG misses and the buffer entry env doesn't match", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_OTHER", + orgId: "org_1", + payload: "{}", + status: "QUEUED", + attempts: 0, + createdAt: NOW, + createdAtMicros: 1747044000000000, + materialised: false, + idempotencyLookupKey: "", + metadataVersion: 0, + }; + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => fakeBuffer(entry), + }, + }); + expect(result).toBeNull(); + }); + + it("returns null when PG misses and the buffer entry org doesn't match", async () => { + const entry: BufferEntry = { + runId: "run_1", + envId: "env_a", + orgId: "org_OTHER", + payload: "{}", + status: "QUEUED", + attempts: 0, + createdAt: NOW, + createdAtMicros: 1747044000000000, + materialised: false, + idempotencyLookupKey: "", + metadataVersion: 0, + }; + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => fakeBuffer(entry), + }, + }); + expect(result).toBeNull(); + }); + + it("returns null when both PG and buffer miss", async () => { + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => fakeBuffer(null), + }, + }); + expect(result).toBeNull(); + }); + + it("returns null when buffer is unavailable (mollifier disabled) and PG misses", async () => { + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica(null), + getBuffer: () => null, + }, + }); + expect(result).toBeNull(); + }); + + it("PG-hit short-circuits before consulting the buffer", async () => { + const buffer = fakeBuffer(null); + const result = await resolveRunForMutation({ + ...baseInput, + deps: { + prismaReplica: fakeReplica({ friendlyId: "run_1" }), + getBuffer: () => buffer, + }, + }); + expect(result?.source).toBe("pg"); + expect(buffer.getEntry).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/redis-worker/src/mollifier/buffer.test.ts b/packages/redis-worker/src/mollifier/buffer.test.ts index 9bd09acef1..b47e41589e 100644 --- a/packages/redis-worker/src/mollifier/buffer.test.ts +++ b/packages/redis-worker/src/mollifier/buffer.test.ts @@ -1937,6 +1937,61 @@ describe("MollifierBuffer.mutateSnapshot", () => { }, ); + redisTest( + "append_tags rejects with limit_exceeded when maxTags would be exceeded, writing nothing", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "r_cap", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ tags: ["a", "b"] }), + }); + + // 2 existing + 2 new = 4 deduped > cap of 3 → rejected, nothing written. + const rejected = await buffer.mutateSnapshot("r_cap", { + type: "append_tags", + tags: ["c", "d"], + maxTags: 3, + }); + expect(rejected).toBe("limit_exceeded"); + const afterReject = await buffer.getEntry("r_cap"); + const rejPayload = JSON.parse(afterReject!.payload) as { tags: string[] }; + expect(rejPayload.tags).toEqual(["a", "b"]); + + // Dedup keeps the count under the cap → applied. + const applied = await buffer.mutateSnapshot("r_cap", { + type: "append_tags", + tags: ["a", "c"], + maxTags: 3, + }); + expect(applied).toBe("applied_to_snapshot"); + const afterApply = await buffer.getEntry("r_cap"); + const appPayload = JSON.parse(afterApply!.payload) as { tags: string[] }; + expect(appPayload.tags).toEqual(["a", "b", "c"]); + + // Landing exactly on the cap is allowed. + const exact = await buffer.mutateSnapshot("r_cap", { + type: "append_tags", + tags: ["a", "b", "c"], + maxTags: 3, + }); + expect(exact).toBe("applied_to_snapshot"); + } finally { + await buffer.close(); + } + }, + ); + redisTest( "set_metadata replaces metadata + metadataType (last-write-wins)", { timeout: 20_000 }, diff --git a/packages/redis-worker/src/mollifier/buffer.ts b/packages/redis-worker/src/mollifier/buffer.ts index 9beceb41ea..1793e58eff 100644 --- a/packages/redis-worker/src/mollifier/buffer.ts +++ b/packages/redis-worker/src/mollifier/buffer.ts @@ -37,12 +37,21 @@ export function mollifierReconnectDelayMs( } export type SnapshotPatch = - | { type: "append_tags"; tags: string[] } + // `maxTags`, when set, caps the deduped tag count atomically inside the + // Lua: if appending would push the snapshot over the limit the patch is + // rejected ("limit_exceeded") and nothing is written, mirroring the + // PG-path MAX_TAGS_PER_RUN check so a buffered run can't accumulate more + // tags than the trigger validator would have allowed at creation. + | { type: "append_tags"; tags: string[]; maxTags?: number } | { type: "set_metadata"; metadata: string; metadataType: string } | { type: "set_delay"; delayUntil: string } | { type: "mark_cancelled"; cancelledAt: string; cancelReason?: string }; -export type MutateSnapshotResult = "applied_to_snapshot" | "not_found" | "busy"; +export type MutateSnapshotResult = + | "applied_to_snapshot" + | "not_found" + | "busy" + | "limit_exceeded"; export type CasSetMetadataResult = | { kind: "applied"; newVersion: number } @@ -311,6 +320,8 @@ export class MollifierBuffer { // FAILED entry, whose hash the drainer-terminal `fail` path DELs. // - "busy": entry is DRAINING or materialised. The API // wait-and-bounces through PG. + // - "limit_exceeded": an `append_tags` patch carrying `maxTags` would + // push the deduped tag count over the cap; nothing is written. async mutateSnapshot(runId: string, patch: SnapshotPatch): Promise { const result = (await this.redis.mutateMollifierSnapshot( `mollifier:entries:${runId}`, @@ -319,7 +330,8 @@ export class MollifierBuffer { if ( result === "applied_to_snapshot" || result === "not_found" || - result === "busy" + result === "busy" || + result === "limit_exceeded" ) { return result; } @@ -914,6 +926,12 @@ export class MollifierBuffer { table.insert(merged, t) end end + -- Cap the deduped count when the caller supplies a limit, so a + -- buffered run can't exceed MAX_TAGS_PER_RUN via the tags API. + -- Reject the whole patch (write nothing) rather than truncating. + if patch.maxTags ~= nil and #merged > patch.maxTags then + return 'limit_exceeded' + end payload.tags = merged elseif patch.type == 'set_metadata' then payload.metadata = patch.metadata