Skip to content

Commit 5fd8611

Browse files
d-csclaude
andcommitted
fix(webapp): forward scheduleId through mollifier buffer synthesis + pin synthesise contract with regression tests
synthesiseFoundRunFromBuffer hardcoded `scheduleId: null`, which dropped the schedule field from the retrieve-API response for any scheduled trigger that landed in the mollifier buffer. Scheduled triggers go through the same TriggerTaskService path as API triggers and the gate doesn't bypass them, so the snapshot does carry scheduleId; the synthesis was just throwing it away. Forward `buffered.scheduleId ?? null` so resolveSchedule() can hydrate the schedule object from PG (the Schedule row exists before the trigger fires). Exported synthesiseFoundRunFromBuffer + the FoundRun type from the presenter and added apps/webapp/test/mollifierSynthesiseFoundRun.test.ts (16 cases). The test file pins the snapshot→FoundRun mapping that previously had no direct coverage — the new scheduleId forwarding plus earlier-session regressions (batch reconstruction, workerQueue default "", FAILED→SYSTEM_FAILURE status mapping, STRING_ERROR shape, defensive metadata coercion, idempotency defaults, execution-state zero defaults). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6a81f13 commit 5fd8611

2 files changed

Lines changed: 213 additions & 3 deletions

File tree

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ type CommonRelatedRun = Prisma.Result<
7373
// extras the route handler reads. Declared explicitly (not inferred via
7474
// ReturnType<typeof findRun>) so findRun can return a synthesised buffered
7575
// run without the type becoming self-referential.
76-
type FoundRun = CommonRelatedRun & {
76+
// Exported so the buffer-synthesis helper below can be unit-tested
77+
// against a stable shape without re-deriving it (FoundRun's exact field
78+
// list is what the buffered run must match for `call()` not to surprise).
79+
export type FoundRun = CommonRelatedRun & {
7780
traceId: string;
7881
payload: string;
7982
payloadType: string;
@@ -587,7 +590,11 @@ function synthesiseMetadata(buffered: SyntheticRun): string | null {
587590
}
588591
}
589592

590-
function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
593+
// Exported for unit testing. Used by `findRun()` above when the
594+
// Postgres lookup misses and the buffer carries the run — keep the shape
595+
// in lockstep with `FoundRun`'s field list so `call()` treats a synthesised
596+
// buffered run identically to a freshly-triggered PG row.
597+
export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
591598
const status: TaskRunStatus = bufferedStatusToTaskRunStatus(buffered.status);
592599

593600
const errorJson: Prisma.JsonValue = buffered.error
@@ -626,7 +633,13 @@ function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
626633
idempotencyKeyOptions: buffered.idempotencyKeyOptions ?? null,
627634
isTest: buffered.isTest,
628635
depth: buffered.depth,
629-
scheduleId: null,
636+
// Scheduled triggers go through the same TriggerTaskService path as
637+
// API triggers and aren't bypassed by the mollifier gate, so a
638+
// scheduled run can land in the buffer with its scheduleId set on the
639+
// snapshot. Forward it so resolveSchedule() can hydrate the `schedule`
640+
// field in the API response instead of silently dropping it until the
641+
// drainer materialises.
642+
scheduleId: buffered.scheduleId ?? null,
630643
lockedToVersion: buffered.lockedToVersion ? { version: buffered.lockedToVersion } : null,
631644
resumeParentOnCompletion: buffered.resumeParentOnCompletion,
632645
// Reconstruct the batch from the snapshot's internal id so a buffered
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
4+
5+
import {
6+
synthesiseFoundRunFromBuffer,
7+
type FoundRun,
8+
} from "~/presenters/v3/ApiRetrieveRunPresenter.server";
9+
import type { SyntheticRun } from "~/v3/mollifier/readFallback.server";
10+
11+
const NOW = new Date("2026-05-24T10:00:00Z");
12+
13+
function makeSyntheticRun(overrides: Partial<SyntheticRun> = {}): SyntheticRun {
14+
return {
15+
id: "run_internal_1",
16+
friendlyId: "run_friendly_1",
17+
status: "QUEUED",
18+
cancelledAt: undefined,
19+
cancelReason: undefined,
20+
delayUntil: undefined,
21+
taskIdentifier: "hello-world",
22+
createdAt: NOW,
23+
payload: '{"hello":"world"}',
24+
payloadType: "application/json",
25+
metadata: undefined,
26+
metadataType: undefined,
27+
seedMetadata: undefined,
28+
seedMetadataType: undefined,
29+
idempotencyKey: undefined,
30+
idempotencyKeyOptions: undefined,
31+
isTest: false,
32+
depth: 0,
33+
ttl: undefined,
34+
tags: ["alpha", "beta"],
35+
runTags: ["alpha", "beta"],
36+
lockedToVersion: undefined,
37+
resumeParentOnCompletion: false,
38+
parentTaskRunId: undefined,
39+
traceId: "trace_1",
40+
spanId: "span_1",
41+
parentSpanId: undefined,
42+
runtimeEnvironmentId: "env_a",
43+
engine: "V2",
44+
workerQueue: undefined,
45+
queue: undefined,
46+
concurrencyKey: undefined,
47+
machinePreset: undefined,
48+
realtimeStreamsVersion: undefined,
49+
maxAttempts: undefined,
50+
maxDurationInSeconds: undefined,
51+
replayedFromTaskRunFriendlyId: undefined,
52+
annotations: undefined,
53+
traceContext: undefined,
54+
scheduleId: undefined,
55+
batchId: undefined,
56+
parentTaskRunFriendlyId: undefined,
57+
rootTaskRunFriendlyId: undefined,
58+
...overrides,
59+
};
60+
}
61+
62+
describe("synthesiseFoundRunFromBuffer", () => {
63+
it("populates internal id and friendlyId so downstream logging keys off the cuid", () => {
64+
const found: FoundRun = synthesiseFoundRunFromBuffer(makeSyntheticRun());
65+
expect(found.id).toBe("run_internal_1");
66+
expect(found.friendlyId).toBe("run_friendly_1");
67+
});
68+
69+
it("forwards scheduleId from the snapshot so resolveSchedule can hydrate the schedule field", () => {
70+
// Regression: scheduleId was previously hardcoded to null, dropping the
71+
// schedule metadata for buffered scheduled runs even though the snapshot
72+
// carries it (readFallback.server.ts extracts snapshot.scheduleId).
73+
const found = synthesiseFoundRunFromBuffer(
74+
makeSyntheticRun({ scheduleId: "schedule_internal_42" })
75+
);
76+
expect(found.scheduleId).toBe("schedule_internal_42");
77+
});
78+
79+
it("leaves scheduleId null when the snapshot has no scheduleId (non-scheduled trigger)", () => {
80+
const found = synthesiseFoundRunFromBuffer(makeSyntheticRun());
81+
expect(found.scheduleId).toBeNull();
82+
});
83+
84+
it("reconstructs batch.friendlyId from snapshot.batchId so batch-scoped JWTs authorise", () => {
85+
// Regression: batch was previously hardcoded to null, so the
86+
// route-authorization callbacks (which read run.batch?.friendlyId)
87+
// skipped pushing the batch resource — a batch-scoped JWT 403'd on
88+
// buffered batched runs.
89+
const found = synthesiseFoundRunFromBuffer(
90+
// BatchId.toFriendlyId encodes the internal id with a "batch_" prefix.
91+
makeSyntheticRun({ batchId: "abcdefghijklmnopqrstuvwx" })
92+
);
93+
expect(found.batch).not.toBeNull();
94+
expect(found.batch!.id).toBe("abcdefghijklmnopqrstuvwx");
95+
expect(found.batch!.friendlyId).toMatch(/^batch_/);
96+
});
97+
98+
it("leaves batch null when the snapshot has no batchId (non-batched run)", () => {
99+
const found = synthesiseFoundRunFromBuffer(makeSyntheticRun());
100+
expect(found.batch).toBeNull();
101+
});
102+
103+
it("defaults workerQueue to '' so createCommonRunStructure coerces region to undefined", () => {
104+
// Regression: workerQueue previously defaulted to "main", which fed
105+
// through `run.workerQueue || undefined` as the API response's
106+
// `region` — advertising a not-yet-assigned region.
107+
const found = synthesiseFoundRunFromBuffer(makeSyntheticRun({ workerQueue: undefined }));
108+
expect(found.workerQueue).toBe("");
109+
});
110+
111+
it("passes through an explicit workerQueue from the snapshot unchanged", () => {
112+
const found = synthesiseFoundRunFromBuffer(
113+
makeSyntheticRun({ workerQueue: "us-east-1" })
114+
);
115+
expect(found.workerQueue).toBe("us-east-1");
116+
});
117+
118+
it("maps buffered FAILED to SYSTEM_FAILURE so the API surfaces the failure", () => {
119+
const found = synthesiseFoundRunFromBuffer(
120+
makeSyntheticRun({
121+
status: "FAILED",
122+
error: { code: "GATE_REJECTED", message: "buffer rejected the run" },
123+
})
124+
);
125+
expect(found.status).toBe("SYSTEM_FAILURE");
126+
expect(found.error).toEqual({
127+
type: "STRING_ERROR",
128+
raw: "GATE_REJECTED: buffer rejected the run",
129+
});
130+
});
131+
132+
it("maps buffered CANCELED to CANCELED with completedAt populated from cancelledAt", () => {
133+
const cancelledAt = new Date("2026-05-24T10:05:00Z");
134+
const found = synthesiseFoundRunFromBuffer(
135+
makeSyntheticRun({ status: "CANCELED", cancelledAt })
136+
);
137+
expect(found.status).toBe("CANCELED");
138+
expect(found.completedAt).toEqual(cancelledAt);
139+
});
140+
141+
it("maps buffered QUEUED to PENDING with no error and no completedAt", () => {
142+
const found = synthesiseFoundRunFromBuffer(makeSyntheticRun({ status: "QUEUED" }));
143+
expect(found.status).toBe("PENDING");
144+
expect(found.error).toBeNull();
145+
expect(found.completedAt).toBeNull();
146+
});
147+
148+
it("passes through a string snapshot.metadata unchanged", () => {
149+
const found = synthesiseFoundRunFromBuffer(
150+
makeSyntheticRun({ metadata: '{"customer":"acme"}' })
151+
);
152+
expect(found.metadata).toBe('{"customer":"acme"}');
153+
});
154+
155+
it("defensively coerces a non-string snapshot.metadata to a JSON string instead of dropping it silently", () => {
156+
// Production never writes non-string metadata, but if the snapshot
157+
// shape drifts we'd rather see the value (with a warn log) than have
158+
// it disappear.
159+
const found = synthesiseFoundRunFromBuffer(
160+
makeSyntheticRun({ metadata: { customer: "acme" } })
161+
);
162+
expect(found.metadata).toBe('{"customer":"acme"}');
163+
});
164+
165+
it("defaults idempotencyKey / idempotencyKeyOptions to null when absent", () => {
166+
const found = synthesiseFoundRunFromBuffer(makeSyntheticRun());
167+
expect(found.idempotencyKey).toBeNull();
168+
expect(found.idempotencyKeyOptions).toBeNull();
169+
});
170+
171+
it("zeroes execution-state fields that aren't meaningful for a buffered run", () => {
172+
const found = synthesiseFoundRunFromBuffer(makeSyntheticRun());
173+
expect(found.startedAt).toBeNull();
174+
expect(found.attempts).toEqual([]);
175+
expect(found.attemptNumber).toBeNull();
176+
expect(found.parentTaskRun).toBeNull();
177+
expect(found.rootTaskRun).toBeNull();
178+
expect(found.childRuns).toEqual([]);
179+
expect(found.output).toBeNull();
180+
expect(found.costInCents).toBe(0);
181+
expect(found.baseCostInCents).toBe(0);
182+
expect(found.usageDurationMs).toBe(0);
183+
});
184+
185+
it("forwards runTags from the snapshot tags array", () => {
186+
const found = synthesiseFoundRunFromBuffer(
187+
makeSyntheticRun({ tags: ["alpha", "beta"] })
188+
);
189+
expect(found.runTags).toEqual(["alpha", "beta"]);
190+
});
191+
192+
it("pins engine to V2 and taskEventStore to taskEvent (only valid values for a buffered run)", () => {
193+
const found = synthesiseFoundRunFromBuffer(makeSyntheticRun());
194+
expect(found.engine).toBe("V2");
195+
expect(found.taskEventStore).toBe("taskEvent");
196+
});
197+
});

0 commit comments

Comments
 (0)