Skip to content

Commit 7494c3d

Browse files
d-csclaude
andcommitted
fix(webapp): review fixes for mollifier dashboard parity
Address review findings on PR #3757: - replay: guard `synthetic.traceId` / `spanId` before adapting a buffered snapshot into the TaskRun shape ReplayTaskRunService expects. Without the guard, an older snapshot missing those fields produces `00-undefined-undefined-01` as the W3C traceparent, which OTel silently drops — severing the replayed run's trace linkage. Extract the adapter into `buildSyntheticReplayTaskRun` so the guard is unit tested. - run-detail: reflect the buffered snapshot's CANCELED state into the NavBar status + Cancel-button gate. Previously `tryMollifiedRunFallback` hardcoded `status: PENDING` / `isFinished: false` regardless of `buffered.status` / `cancelledAt`, so cancelling a buffered run left the NavBar showing Pending with a Cancel button until the drainer materialised the row. Extract the mapping into `buildSyntheticRunHeader` and unit test PENDING vs CANCELED. - admin runs redirect (@.runs.$runParam): preselect the root span on the buffered redirect via `v3RunSpanPath`, matching the sibling redirect routes. Without this the trace tree opened with nothing selected for buffered runs reached via the admin impersonate path. - RunStreamPresenter: switch from `deserialiseSnapshot` (redis-worker) to the webapp wrapper `deserialiseMollifierSnapshot` so both read-side modules share one deserialisation path, per the contract comment in `syntheticRedirectInfo.server.ts`. - cancel route: collapse the `mutateSnapshot` "not_found" branch into the same retry message as "busy". Both indicate the drainer raced the mutation between `getEntry` and `mutateSnapshot`; the run is in PG by then and a retry hits the regular cancel path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c2e1c6e commit 7494c3d

9 files changed

Lines changed: 637 additions & 34 deletions

File tree

apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { logger } from "~/services/logger.server";
33
import { singleton } from "~/utils/singleton";
44
import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse";
55
import { throttle } from "~/utils/throttle";
6+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
7+
import { deserialiseMollifierSnapshot } from "~/v3/mollifier/mollifierSnapshot.server";
68
import { tracePubSub } from "~/v3/services/tracePubSub.server";
79

810
const PING_INTERVAL = 5_000;
@@ -37,17 +39,48 @@ export class RunStreamPresenter {
3739
},
3840
});
3941

40-
if (!run) {
42+
// Fall back to the mollifier buffer when the run isn't in PG yet.
43+
// The buffered run has no execution events to stream, but we still
44+
// attach a trace-pubsub subscription using the snapshot's traceId
45+
// so that the moment the drainer materialises the row and execution
46+
// begins, those events flow to this open SSE connection. Closing
47+
// with 404 would force the dashboard to keep retrying.
48+
let traceId: string | null = run?.traceId ?? null;
49+
if (!traceId) {
50+
const buffer = getMollifierBuffer();
51+
if (buffer) {
52+
try {
53+
const entry = await buffer.getEntry(runFriendlyId);
54+
if (entry) {
55+
// Go through the webapp wrapper so this read-side module
56+
// shares a single deserialisation path with readFallback —
57+
// see the contract comment in syntheticRedirectInfo.server.ts.
58+
const snapshot = deserialiseMollifierSnapshot(entry.payload);
59+
if (typeof snapshot.traceId === "string") {
60+
traceId = snapshot.traceId;
61+
}
62+
}
63+
} catch (err) {
64+
logger.warn("RunStreamPresenter buffer fallback failed", {
65+
runFriendlyId,
66+
err: err instanceof Error ? err.message : String(err),
67+
});
68+
}
69+
}
70+
}
71+
72+
if (!traceId) {
4173
throw new Response("Not found", { status: 404 });
4274
}
75+
const resolvedRun = { traceId };
4376

4477
logger.info("RunStreamPresenter.start", {
4578
runFriendlyId,
46-
traceId: run.traceId,
79+
traceId: resolvedRun.traceId,
4780
});
4881

4982
// Subscribe to trace updates
50-
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
83+
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(resolvedRun.traceId);
5184

5285
// Only send max every 1 second
5386
const throttledSend = throttle(
@@ -105,7 +138,7 @@ export class RunStreamPresenter {
105138
cleanup: () => {
106139
logger.info("RunStreamPresenter.cleanup", {
107140
runFriendlyId,
108-
traceId: run.traceId,
141+
traceId: resolvedRun.traceId,
109142
});
110143

111144
// Remove message listener
@@ -119,13 +152,13 @@ export class RunStreamPresenter {
119152
.then(() => {
120153
logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", {
121154
runFriendlyId,
122-
traceId: run.traceId,
155+
traceId: resolvedRun.traceId,
123156
});
124157
})
125158
.catch((error) => {
126159
logger.error("RunStreamPresenter.cleanup.unsubscribe failed", {
127160
runFriendlyId,
128-
traceId: run.traceId,
161+
traceId: resolvedRun.traceId,
129162
error: {
130163
name: error.name,
131164
message: error.message,

apps/webapp/app/routes/@.runs.$runParam.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import { z } from "zod";
33
import { prisma } from "~/db.server";
44
import { redirectWithErrorMessage } from "~/models/message.server";
55
import { requireUser } from "~/services/session.server";
6-
import { impersonate, rootPath, v3RunPath } from "~/utils/pathBuilder";
6+
import { impersonate, rootPath, v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder";
7+
import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server";
78

89
const ParamsSchema = z.object({
910
runParam: z.string(),
@@ -32,6 +33,7 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
3233
friendlyId: runParam,
3334
},
3435
select: {
36+
spanId: true,
3537
runtimeEnvironment: {
3638
select: {
3739
slug: true,
@@ -51,16 +53,45 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
5153
});
5254

5355
if (!run) {
56+
// Admin impersonation route — bypass org membership so admins can
57+
// open any buffered run by friendlyId, mirroring the existing PG
58+
// behaviour above (no membership filter on the find).
59+
const buffered = await findBufferedRunRedirectInfo({
60+
runFriendlyId: runParam,
61+
userId: user.id,
62+
skipOrgMembershipCheck: true,
63+
});
64+
if (buffered) {
65+
// Preselect the root span so the run-detail trace tree opens with
66+
// the buffered run's span highlighted, matching the sibling
67+
// redirect routes (runs.$runParam.ts, projects.v3.$projectRef…).
68+
const path = buffered.spanId
69+
? v3RunSpanPath(
70+
{ slug: buffered.organizationSlug },
71+
{ slug: buffered.projectSlug },
72+
{ slug: buffered.environmentSlug },
73+
{ friendlyId: runParam },
74+
{ spanId: buffered.spanId }
75+
)
76+
: v3RunPath(
77+
{ slug: buffered.organizationSlug },
78+
{ slug: buffered.projectSlug },
79+
{ slug: buffered.environmentSlug },
80+
{ friendlyId: runParam }
81+
);
82+
return redirect(impersonate(path));
83+
}
5484
return redirectWithErrorMessage(rootPath(), request, "Run doesn't exist", {
5585
ephemeral: false,
5686
});
5787
}
5888

59-
const path = v3RunPath(
89+
const path = v3RunSpanPath(
6090
{ slug: run.project.organization.slug },
6191
{ slug: run.project.slug },
6292
{ slug: run.runtimeEnvironment.slug },
63-
{ friendlyId: runParam }
93+
{ friendlyId: runParam },
94+
{ spanId: run.spanId }
6495
);
6596

6697
return redirect(impersonate(path));

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx

Lines changed: 108 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,14 @@ import { useReplaceSearchParams } from "~/hooks/useReplaceSearchParams";
8888
import { useSearchParams } from "~/hooks/useSearchParam";
8989
import { type Shortcut, useShortcutKeys } from "~/hooks/useShortcutKeys";
9090
import { useHasAdminAccess } from "~/hooks/useUser";
91+
import { env } from "~/env.server";
9192
import { findProjectBySlug } from "~/models/project.server";
9293
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
9394
import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server";
9495
import { RunEnvironmentMismatchError, RunPresenter } from "~/presenters/v3/RunPresenter.server";
96+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
97+
import { buildSyntheticRunHeader } from "~/v3/mollifier/syntheticRunHeader.server";
98+
import { buildSyntheticTraceForBufferedRun } from "~/v3/mollifier/syntheticTrace.server";
9599
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
96100
import { getImpersonationId } from "~/services/impersonation.server";
97101
import { logger } from "~/services/logger.server";
@@ -277,6 +281,31 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
277281
);
278282
}
279283

284+
// PG miss → try the mollifier buffer. When the gate diverts a trigger
285+
// the run sits in Redis until the drainer materialises it; without
286+
// this fallback the run-detail page 404s for the brief buffered window
287+
// even though the API has accepted the trigger and returned an id.
288+
const buffered = await tryMollifiedRunFallback({
289+
runFriendlyId: runParam,
290+
organizationSlug,
291+
projectSlug: projectParam,
292+
envSlug: envParam,
293+
userId,
294+
});
295+
296+
if (buffered) {
297+
const parent = await getResizableSnapshot(request, resizableSettings.parent.autosaveId);
298+
const tree = await getResizableSnapshot(request, resizableSettings.tree.autosaveId);
299+
300+
return json({
301+
run: buffered.run,
302+
trace: buffered.trace,
303+
maximumLiveReloadingSetting: env.MAXIMUM_LIVE_RELOADING_EVENTS,
304+
resizable: { parent, tree },
305+
runsList: null,
306+
});
307+
}
308+
280309
throw error;
281310
}
282311

@@ -305,6 +334,39 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
305334
});
306335
};
307336

337+
async function tryMollifiedRunFallback(args: {
338+
runFriendlyId: string;
339+
organizationSlug: string;
340+
projectSlug: string;
341+
envSlug: string;
342+
userId: string;
343+
}) {
344+
const project = await findProjectBySlug(args.organizationSlug, args.projectSlug, args.userId);
345+
if (!project) return null;
346+
const environment = await findEnvironmentBySlug(project.id, args.envSlug, args.userId);
347+
if (!environment) return null;
348+
349+
const buffered = await findRunByIdWithMollifierFallback({
350+
runId: args.runFriendlyId,
351+
environmentId: environment.id,
352+
organizationId: project.organizationId,
353+
});
354+
if (!buffered) return null;
355+
356+
return {
357+
run: buildSyntheticRunHeader({
358+
run: buffered,
359+
environment: {
360+
id: environment.id,
361+
organizationId: project.organizationId,
362+
type: environment.type,
363+
slug: environment.slug,
364+
},
365+
}),
366+
trace: buildSyntheticTraceForBufferedRun(buffered),
367+
};
368+
}
369+
308370
type LoaderData = SerializeFrom<typeof loader>;
309371

310372
export default function Page() {
@@ -407,23 +469,17 @@ export default function Page() {
407469
/>
408470
</Dialog>
409471
{run.isFinished ? null : (
410-
<Dialog key={`cancel-${run.friendlyId}`}>
411-
<DialogTrigger asChild>
412-
<Button variant="danger/small" LeadingIcon={StopCircleIcon} shortcut={{ key: "C" }}>
413-
Cancel run…
414-
</Button>
415-
</DialogTrigger>
416-
<CancelRunDialog
417-
runFriendlyId={run.friendlyId}
418-
redirectPath={v3RunSpanPath(
419-
organization,
420-
project,
421-
environment,
422-
{ friendlyId: run.friendlyId },
423-
{ spanId: run.spanId }
424-
)}
425-
/>
426-
</Dialog>
472+
<ControlledCancelRunDialog
473+
key={`cancel-${run.friendlyId}`}
474+
runFriendlyId={run.friendlyId}
475+
redirectPath={v3RunSpanPath(
476+
organization,
477+
project,
478+
environment,
479+
{ friendlyId: run.friendlyId },
480+
{ spanId: run.spanId }
481+
)}
482+
/>
427483
)}
428484
</PageAccessories>
429485
</NavBar>
@@ -587,6 +643,35 @@ function TraceView({
587643
);
588644
}
589645

646+
// Controlled wrapper around the cancel dialog. Owns the Radix open state
647+
// so the dialog closes itself once the cancel action transitions through
648+
// submission. We can't `<DialogClose asChild>`-wrap the submit button
649+
// because Radix's onClick handler swallows the button's name=value pair
650+
// that the form action depends on for `redirectUrl`.
651+
function ControlledCancelRunDialog({
652+
runFriendlyId,
653+
redirectPath,
654+
}: {
655+
runFriendlyId: string;
656+
redirectPath: string;
657+
}) {
658+
const [open, setOpen] = useState(false);
659+
return (
660+
<Dialog open={open} onOpenChange={setOpen}>
661+
<DialogTrigger asChild>
662+
<Button variant="danger/small" LeadingIcon={StopCircleIcon} shortcut={{ key: "C" }}>
663+
Cancel run…
664+
</Button>
665+
</DialogTrigger>
666+
<CancelRunDialog
667+
runFriendlyId={runFriendlyId}
668+
redirectPath={redirectPath}
669+
onCancelSubmitted={() => setOpen(false)}
670+
/>
671+
</Dialog>
672+
);
673+
}
674+
590675
function NoLogsView({ run, resizable }: Pick<LoaderData, "run" | "resizable">) {
591676
const plan = useCurrentPlan();
592677
const organization = useOrganization();
@@ -616,9 +701,13 @@ function NoLogsView({ run, resizable }: Pick<LoaderData, "run" | "resizable">) {
616701
>
617702
<div className="grid h-full place-items-center">
618703
{daysSinceCompleted === undefined ? (
619-
<InfoPanel variant="info" icon={InformationCircleIcon} title="We delete old logs">
704+
<InfoPanel
705+
variant="info"
706+
icon={InformationCircleIcon}
707+
title="Waiting to start"
708+
>
620709
<Paragraph variant="small">
621-
We tidy up older logs to keep things running smoothly.
710+
This run is queued. Logs will appear here once it begins executing.
622711
</Paragraph>
623712
</InfoPanel>
624713
) : isWithinLogRetention ? (

apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/m
66
import { logger } from "~/services/logger.server";
77
import { requireUserId } from "~/services/session.server";
88
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
9+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
910

1011
export const cancelSchema = z.object({
1112
redirectUrl: z.string(),
@@ -42,15 +43,56 @@ export const action: ActionFunction = async ({ request, params }) => {
4243
},
4344
});
4445

45-
if (!taskRun) {
46+
if (taskRun) {
47+
const cancelRunService = new CancelTaskRunService();
48+
await cancelRunService.call(taskRun);
49+
return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`);
50+
}
51+
52+
// PG miss — try the mollifier buffer. The customer can hit cancel
53+
// on a buffered run from the dashboard during the burst window.
54+
// Snapshot a `mark_cancelled` patch; the drainer's
55+
// bifurcation routes the run to `engine.createCancelledRun` on
56+
// next pop.
57+
const buffer = getMollifierBuffer();
58+
const entry = buffer ? await buffer.getEntry(runParam) : null;
59+
if (!entry) {
4660
submission.error = { runParam: ["Run not found"] };
4761
return json(submission);
4862
}
4963

50-
const cancelRunService = new CancelTaskRunService();
51-
await cancelRunService.call(taskRun);
64+
// Dashboard auth: verify the requesting user is a member of the
65+
// buffered run's org. The API path scopes by env id from the
66+
// authenticated request; the dashboard route uses org-membership
67+
// because the URL doesn't carry an envId.
68+
const member = await prisma.orgMember.findFirst({
69+
where: { userId, organizationId: entry.orgId },
70+
select: { id: true },
71+
});
72+
if (!member) {
73+
submission.error = { runParam: ["Run not found"] };
74+
return json(submission);
75+
}
5276

53-
return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`);
77+
const result = await buffer!.mutateSnapshot(runParam, {
78+
type: "mark_cancelled",
79+
cancelledAt: new Date().toISOString(),
80+
cancelReason: "Canceled by user",
81+
});
82+
if (result === "applied_to_snapshot") {
83+
return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`);
84+
}
85+
// "not_found" or "busy" — both indicate the drainer raced us between
86+
// the getEntry check above and mutateSnapshot. On "not_found" the
87+
// entry was just popped and the PG row is in flight; on "busy" the
88+
// drainer is mid-materialisation. Either way the customer should
89+
// retry — by then the PG row exists and the regular cancel path at
90+
// the top of this action takes over.
91+
return redirectWithErrorMessage(
92+
submission.value.redirectUrl,
93+
request,
94+
"Run is materialising — retry in a moment"
95+
);
5496
} catch (error) {
5597
if (error instanceof Error) {
5698
logger.error("Failed to cancel run", {

0 commit comments

Comments
 (0)