Skip to content

Commit 062bcfa

Browse files
d-csclaude
andcommitted
fix(webapp,run-engine): replay-layer code-review follow-ups
- `isRetryablePgError`: also accept `errorCode === "P1001"` so `PrismaClientInitializationError` (which surfaces P1001 on a different field than `PrismaClientKnownRequestError`) retries. - Drop `envId` from OTel metric labels on `mollifier.realtime_subscriptions.buffered`, `mollifier.stale_entries`, and the `mollifier.stale_entries.current` gauge. `envId` is a banned high-cardinality attribute; the structured warn log alongside each counter tick still carries envId for forensic drill-down. - Stale-sweep test name + comments now match the assertion shape (all three entries stale, not "two stale + one fresh"). - `RunEngine.createCancelledRun` P2002 path now requires the existing row's status to be CANCELED; a non-canceled conflict throws rather than silently reporting success, so the caller can route to `engine.cancelRun()` or skip. - Regression test pins the new conflict guard. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b89da52 commit 062bcfa

6 files changed

Lines changed: 160 additions & 65 deletions

File tree

apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,15 @@ const tracer = trace.getTracer("mollifier-drainer");
1010
export function isRetryablePgError(err: unknown): boolean {
1111
if (!(err instanceof Error)) return false;
1212
const msg = err.message ?? "";
13+
// Prisma surfaces P1001 ("Can't reach database server") via two
14+
// different error classes — `PrismaClientKnownRequestError` exposes
15+
// it as `err.code`, `PrismaClientInitializationError` exposes it as
16+
// `err.errorCode`. Check both so reconnection-time errors retry
17+
// regardless of which class fires.
1318
const code = (err as { code?: string }).code;
19+
const errorCode = (err as { errorCode?: string }).errorCode;
1420
if (code === "P2024") return true;
21+
if (code === "P1001" || errorCode === "P1001") return true;
1522
if (msg.includes("Can't reach database server")) return true;
1623
if (msg.includes("Connection lost")) return true;
1724
if (msg.includes("ECONNRESET")) return true;

apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ export type StaleSweepConfig = {
2222

2323
export type StaleSweepDeps = {
2424
getBuffer?: () => MollifierBuffer | null;
25-
recordStaleEntry?: (envId: string) => void;
25+
// No `envId` arg — `envId` is a high-cardinality metric attribute and
26+
// is intentionally not emitted as a metric label. The structured warn
27+
// log below carries envId for forensic drill-down.
28+
recordStaleEntry?: () => void;
2629
reportStaleEntrySnapshot?: (snapshot: Map<string, number>) => void;
2730
logger?: { warn: (message: string, fields: Record<string, unknown>) => void };
2831
now?: () => number;
@@ -82,7 +85,7 @@ export async function runStaleSweepOnce(
8285
entriesScanned += 1;
8386
const dwellMs = now - entry.createdAt.getTime();
8487
if (dwellMs > config.staleThresholdMs) {
85-
recordStale(envId);
88+
recordStale();
8689
log.warn("mollifier.stale_entry", {
8790
runId: entry.runId,
8891
envId,

apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,21 @@ export const realtimeBufferedSubscriptionsCounter = meter.createCounter(
2929
},
3030
);
3131

32-
export function recordRealtimeBufferedSubscription(envId: string): void {
33-
realtimeBufferedSubscriptionsCounter.add(1, { envId });
32+
// No `envId` attribute — `envId` is a banned high-cardinality metric
33+
// label per the repo's OTel rules. The structured warn log emitted
34+
// alongside the counter tick (in `mollifierStaleSweep.server.ts`)
35+
// carries the envId / orgId / runId for forensic drill-down; the
36+
// metric stays an aggregate.
37+
export function recordRealtimeBufferedSubscription(): void {
38+
realtimeBufferedSubscriptionsCounter.add(1);
3439
}
3540

3641
// Counts buffer entries that have been waiting in the queue ZSET longer
37-
// than the configured stale threshold (typically half of entryTtlSeconds).
38-
// Useful for historical "stale events over time" views, but not directly
39-
// alertable on its own — a single stuck entry observed by N sweep ticks
40-
// adds N to the counter, so `rate()` over an alerting window reflects
41-
// (entries × ticks), not "entries that are stale right now".
42+
// than the configured stale threshold. Useful for historical "stale
43+
// events over time" views, but not directly alertable on its own — a
44+
// single stuck entry observed by N sweep ticks adds N to the counter,
45+
// so `rate()` over an alerting window reflects (entries × ticks), not
46+
// "entries that are stale right now".
4247
export const staleEntriesCounter = meter.createCounter(
4348
"mollifier.stale_entries",
4449
{
@@ -47,16 +52,16 @@ export const staleEntriesCounter = meter.createCounter(
4752
},
4853
);
4954

50-
export function recordStaleEntry(envId: string): void {
51-
staleEntriesCounter.add(1, { envId });
55+
// No `envId` attribute — see comment above.
56+
export function recordStaleEntry(): void {
57+
staleEntriesCounter.add(1);
5258
}
5359

54-
// Alertable signal: the count of stale entries observed by the latest
55-
// sweep, per env. The sweep snapshots the full per-env picture on each
56-
// pass (including zeros for envs that no longer have any stale entries)
57-
// so an env that was paging can clear when the drainer catches up
58-
// instead of staying latched. Recommended alert:
59-
// mollifier_stale_entries_current{envId=...} > 0 for 5m
60+
// Alertable signal: the total count of stale entries observed by the
61+
// latest sweep. The sweep snapshots the full picture on each pass so
62+
// the gauge drops back to 0 when the drainer catches up instead of
63+
// staying latched. Recommended alert:
64+
// mollifier_stale_entries_current > 0 for 5m
6065
export const staleEntriesGauge = meter.createObservableGauge(
6166
"mollifier.stale_entries.current",
6267
{
@@ -65,23 +70,22 @@ export const staleEntriesGauge = meter.createObservableGauge(
6570
},
6671
);
6772

68-
const latestStaleSnapshot = new Map<string, number>();
73+
let latestStaleTotal = 0;
6974

7075
export function reportStaleEntrySnapshot(snapshot: Map<string, number>): void {
71-
// Replace, don't merge — envs absent from the new snapshot have either
72-
// drained or no longer exist; leaving their last value cached would
73-
// keep alerts latched forever.
74-
latestStaleSnapshot.clear();
75-
for (const [envId, count] of snapshot) {
76-
latestStaleSnapshot.set(envId, count);
76+
// Sum across envs. Per-env breakdown is intentionally NOT emitted as
77+
// a metric label (high-cardinality); the structured warn log lines
78+
// from the sweep carry per-env detail for ops to drill down.
79+
let total = 0;
80+
for (const count of snapshot.values()) {
81+
total += count;
7782
}
83+
latestStaleTotal = total;
7884
}
7985

8086
meter.addBatchObservableCallback(
8187
(result) => {
82-
for (const [envId, count] of latestStaleSnapshot) {
83-
result.observe(staleEntriesGauge, count, { envId });
84-
}
88+
result.observe(staleEntriesGauge, latestStaleTotal);
8589
},
8690
[staleEntriesGauge],
8791
);

apps/webapp/test/mollifierStaleSweep.test.ts

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,21 @@ const SNAPSHOT = {
1414
};
1515

1616
function spyDeps() {
17-
const recordedStaleEnvIds: string[] = [];
17+
// Counter ticks — metric carries no `envId` label (high-cardinality)
18+
// so the spy is a simple call count. Per-env detail lives on the
19+
// structured warn log and the snapshot map.
20+
let staleEntryCount = 0;
1821
const snapshots: Array<Map<string, number>> = [];
1922
const warnings: Array<{ message: string; fields: Record<string, unknown> }> = [];
2023
return {
21-
recordedStaleEnvIds,
24+
get staleEntryCount() {
25+
return staleEntryCount;
26+
},
2227
snapshots,
2328
warnings,
2429
deps: {
25-
recordStaleEntry: (envId: string) => {
26-
recordedStaleEnvIds.push(envId);
30+
recordStaleEntry: () => {
31+
staleEntryCount += 1;
2732
},
2833
reportStaleEntrySnapshot: (snapshot: Map<string, number>) => {
2934
// Clone so post-sweep assertions see what was reported *at that
@@ -45,19 +50,20 @@ describe("runStaleSweepOnce — unit", () => {
4550
// Mirrors the prod gate: if TRIGGER_MOLLIFIER_ENABLED=0 the buffer
4651
// singleton is null and the sweep is a no-op. We don't want it to
4752
// emit a metric (or throw) just because mollifier is disabled.
48-
const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps();
53+
const spies = spyDeps();
4954
const result = await runStaleSweepOnce(
5055
{ staleThresholdMs: 1000 },
51-
{ ...deps, getBuffer: () => null },
56+
{ ...spies.deps, getBuffer: () => null },
5257
);
5358
expect(result).toEqual({
5459
orgsScanned: 0,
5560
envsScanned: 0,
5661
entriesScanned: 0,
5762
staleCount: 0,
5863
});
59-
expect(recordedStaleEnvIds).toEqual([]);
60-
expect(warnings).toEqual([]);
64+
expect(spies.staleEntryCount).toBe(0);
65+
expect(spies.warnings).toEqual([]);
66+
const snapshots = spies.snapshots;
6167
// An empty snapshot is still reported so any previously-paging env
6268
// (from a prior sweep before mollifier was disabled) clears.
6369
expect(snapshots).toHaveLength(1);
@@ -67,14 +73,15 @@ describe("runStaleSweepOnce — unit", () => {
6773

6874
describe("runStaleSweepOnce — testcontainers", () => {
6975
redisTest(
70-
"flags entries whose dwell exceeds the stale threshold and skips fresh ones",
76+
"flags every entry whose dwell exceeds the stale threshold",
7177
async ({ redisOptions }) => {
7278
const buffer = new MollifierBuffer({ redisOptions });
7379
try {
74-
// Two stale entries (one in each env) + one fresh entry. Sweep
75-
// should flag the two stale, leave the fresh one alone, record
76-
// the counter once per stale entry, and emit a warning per
77-
// stale entry with the dwell + threshold.
80+
// Three entries across two envs in the same org. The sweep below
81+
// runs against a `now` advanced by 5 minutes, so all three have
82+
// dwell ~5min and ALL THREE are stale against a 1-minute
83+
// threshold — there is no "fresh" entry in this scenario. The
84+
// assertions below pin the all-three-stale shape.
7885
await buffer.accept({
7986
runId: "run_stale_a",
8087
envId: "env_a",
@@ -88,7 +95,7 @@ describe("runStaleSweepOnce — testcontainers", () => {
8895
payload: JSON.stringify(SNAPSHOT),
8996
});
9097
await buffer.accept({
91-
runId: "run_fresh",
98+
runId: "run_stale_c",
9299
envId: "env_a",
93100
orgId: "org_1",
94101
payload: JSON.stringify(SNAPSHOT),
@@ -98,11 +105,11 @@ describe("runStaleSweepOnce — testcontainers", () => {
98105
// the threshold without actually waiting in real time.
99106
const futureNow = Date.now() + 5 * 60 * 1000;
100107

101-
const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps();
108+
const spies = spyDeps();
102109
const result = await runStaleSweepOnce(
103110
{ staleThresholdMs: 60 * 1000 },
104111
{
105-
...deps,
112+
...spies.deps,
106113
getBuffer: () => buffer,
107114
now: () => futureNow,
108115
},
@@ -111,22 +118,21 @@ describe("runStaleSweepOnce — testcontainers", () => {
111118
expect(result.envsScanned).toBe(2);
112119
expect(result.entriesScanned).toBe(3);
113120
expect(result.staleCount).toBe(3);
114-
// All three entries have dwell ~5min, all exceed the 1-min
115-
// threshold; each emits one counter tick + one warning.
116-
expect(recordedStaleEnvIds.sort()).toEqual(
117-
["env_a", "env_a", "env_b"].sort(),
118-
);
119-
expect(warnings).toHaveLength(3);
120-
for (const w of warnings) {
121+
// All three entries exceed the threshold; each emits one
122+
// counter tick + one warning.
123+
expect(spies.staleEntryCount).toBe(3);
124+
expect(spies.warnings).toHaveLength(3);
125+
for (const w of spies.warnings) {
121126
expect(w.message).toBe("mollifier.stale_entry");
122127
expect(w.fields.staleThresholdMs).toBe(60 * 1000);
123128
expect(w.fields.dwellMs).toBeGreaterThan(60 * 1000);
124129
}
125130
// Snapshot drives the alertable gauge — env_a has 2 stale
126-
// entries, env_b has 1. Both must appear so a future alert can
127-
// identify which env is paging.
128-
expect(snapshots).toHaveLength(1);
129-
expect(Object.fromEntries(snapshots[0])).toEqual({
131+
// entries, env_b has 1. Per-env detail is still passed to
132+
// `reportStaleEntrySnapshot` for forensic value even though the
133+
// gauge itself aggregates the total.
134+
expect(spies.snapshots).toHaveLength(1);
135+
expect(Object.fromEntries(spies.snapshots[0])).toEqual({
130136
env_a: 2,
131137
env_b: 1,
132138
});
@@ -151,13 +157,13 @@ describe("runStaleSweepOnce — testcontainers", () => {
151157
orgId: "org_1",
152158
payload: JSON.stringify(SNAPSHOT),
153159
});
154-
const { deps, snapshots } = spyDeps();
160+
const spies = spyDeps();
155161
await runStaleSweepOnce(
156162
{ staleThresholdMs: 60 * 1000 },
157-
{ ...deps, getBuffer: () => buffer },
163+
{ ...spies.deps, getBuffer: () => buffer },
158164
);
159-
expect(snapshots).toHaveLength(1);
160-
expect(Object.fromEntries(snapshots[0])).toEqual({ env_a: 0 });
165+
expect(spies.snapshots).toHaveLength(1);
166+
expect(Object.fromEntries(spies.snapshots[0])).toEqual({ env_a: 0 });
161167
} finally {
162168
await buffer.close();
163169
}
@@ -179,14 +185,14 @@ describe("runStaleSweepOnce — testcontainers", () => {
179185
orgId: "org_1",
180186
payload: JSON.stringify(SNAPSHOT),
181187
});
182-
const { deps, recordedStaleEnvIds, warnings } = spyDeps();
188+
const spies = spyDeps();
183189
const result = await runStaleSweepOnce(
184190
{ staleThresholdMs: 60 * 1000 },
185-
{ ...deps, getBuffer: () => buffer },
191+
{ ...spies.deps, getBuffer: () => buffer },
186192
);
187193
expect(result.staleCount).toBe(0);
188-
expect(recordedStaleEnvIds).toEqual([]);
189-
expect(warnings).toEqual([]);
194+
expect(spies.staleEntryCount).toBe(0);
195+
expect(spies.warnings).toEqual([]);
190196
} finally {
191197
await buffer.close();
192198
}
@@ -215,10 +221,10 @@ describe("runStaleSweepOnce — testcontainers", () => {
215221
payload: JSON.stringify(SNAPSHOT),
216222
});
217223
const futureNow = Date.now() + 5 * 60 * 1000;
218-
const { deps } = spyDeps();
224+
const spies = spyDeps();
219225
const result = await runStaleSweepOnce(
220226
{ staleThresholdMs: 60 * 1000 },
221-
{ ...deps, getBuffer: () => buffer, now: () => futureNow },
227+
{ ...spies.deps, getBuffer: () => buffer, now: () => futureNow },
222228
);
223229
expect(result.orgsScanned).toBe(2);
224230
expect(result.envsScanned).toBe(2);

internal-packages/run-engine/src/engine/index.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,21 @@ export class RunEngine {
599599
{ friendlyId: snapshot.friendlyId },
600600
);
601601
const existing = await prisma.taskRun.findFirst({ where: { id } });
602-
if (existing) return existing;
602+
if (existing) {
603+
// Only treat the conflict as idempotent when the existing
604+
// row is ALREADY canceled. If a non-canceled row landed
605+
// first (e.g. the drainer's normal `engine.trigger` replay
606+
// path raced ahead of the cancel) we surface a conflict
607+
// rather than silently reporting "cancelled" — the run is
608+
// genuinely live and the caller must decide between
609+
// engine.cancelRun() and skipping.
610+
if (existing.status === "CANCELED") {
611+
return existing;
612+
}
613+
throw new Error(
614+
`createCancelledRun conflict: existing run ${snapshot.friendlyId} has status ${existing.status}`,
615+
);
616+
}
603617
}
604618
throw err;
605619
}

internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,4 +230,65 @@ describe("RunEngine.createCancelledRun", () => {
230230
}
231231
},
232232
);
233+
234+
// Regression: the P2002-on-id idempotency path used to return ANY
235+
// existing row, which would silently report success even if a live
236+
// (non-CANCELED) row landed first. The guard now requires the
237+
// existing row's status to be CANCELED; anything else surfaces a
238+
// conflict so the caller can route to engine.cancelRun() or skip.
239+
containerTest(
240+
"P2002 conflict with non-CANCELED existing row throws (does not silently succeed)",
241+
async ({ prisma, redisOptions }) => {
242+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
243+
const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) });
244+
try {
245+
const friendlyId = freshRunId();
246+
const id = RunId.fromFriendlyId(friendlyId);
247+
248+
// Plant a live (non-CANCELED) row with the same id so the
249+
// cancelled-run INSERT hits P2002 and the guard finds a row
250+
// that ISN'T CANCELED.
251+
await prisma.taskRun.create({
252+
data: {
253+
id,
254+
friendlyId,
255+
taskIdentifier: "test-task",
256+
payload: "{}",
257+
payloadType: "application/json",
258+
status: "PENDING",
259+
runtimeEnvironmentId: env.id,
260+
projectId: env.project.id,
261+
organizationId: env.organizationId,
262+
queue: "task/test-task",
263+
traceId: "0000000000000000aaaa000000000000",
264+
spanId: "bbbb000000000000",
265+
engine: "V2",
266+
},
267+
});
268+
269+
await expect(
270+
engine.createCancelledRun({
271+
snapshot: {
272+
friendlyId,
273+
environment: env,
274+
taskIdentifier: "test-task",
275+
payload: "{}",
276+
payloadType: "application/json",
277+
context: {},
278+
traceContext: {},
279+
traceId: "0000000000000000aaaa000000000000",
280+
spanId: "bbbb000000000000",
281+
queue: "task/test-task",
282+
isTest: false,
283+
tags: [],
284+
},
285+
cancelledAt: new Date(),
286+
cancelReason: "Should not silently overwrite a live row",
287+
}),
288+
).rejects.toThrow(/createCancelledRun conflict.*PENDING/);
289+
} finally {
290+
await engine.quit();
291+
}
292+
},
293+
);
233294
});

0 commit comments

Comments
 (0)