Skip to content

Commit e81ab0e

Browse files
d-csclaude
andcommitted
docs(mollifier): strip internal planning labels from comments
Remove plan-tracking shorthand (Phase N, Q#, B6a) from mollifier comments and reword to plain English. Comment/test-only; no behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d55d1dd commit e81ab0e

11 files changed

Lines changed: 46 additions & 46 deletions

File tree

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,13 @@ export class RunEngineTriggerTaskService {
374374

375375
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
376376

377-
// Phase 1 dual-write: if the org has the mollifier feature flag
377+
// Dual-write: if the org has the mollifier feature flag
378378
// enabled and the per-env trip evaluator says divert, write the
379379
// canonical replay payload to the buffer AND continue through
380380
// engine.trigger as normal. The buffer entry is an audit/preview
381381
// copy; the drainer's no-op handler consumes it to prove the
382-
// dequeue mechanism works. Phase 2 will replace engine.trigger
383-
// (below) with a synthesised 200 response and rely on the
382+
// dequeue mechanism works. A later change replaces engine.trigger
383+
// (below) with a synthesised 200 response and relies on the
384384
// drainer to perform the Postgres write via replay.
385385
if (mollifierOutcome?.action === "mollify") {
386386
const buffer = this.getMollifierBuffer();
@@ -430,8 +430,8 @@ export class RunEngineTriggerTaskService {
430430
});
431431
} catch (err) {
432432
// Fail-open: buffer write must never block the customer's
433-
// trigger. engine.trigger below is the primary write path
434-
// in Phase 1 — the customer still gets a valid run.
433+
// trigger. engine.trigger below is still the primary write
434+
// path here — the customer still gets a valid run.
435435
logger.error("mollifier.buffer_accept_failed", {
436436
runId: runFriendlyId,
437437
envId: environment.id,

apps/webapp/app/v3/mollifier/bufferedTriggerPayload.server.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@ import type { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
22
import type { TriggerTaskServiceOptions } from "~/v3/services/triggerTask.server";
33

44
// Canonical payload shape written to the mollifier buffer when the gate
5-
// decides to mollify a trigger. Phase 1 ALSO calls engine.trigger directly
6-
// (dual-write) so this is currently an audit/preview record. Phase 2 will
7-
// make the buffer the primary write path: the drainer's handler will read
8-
// this payload and replay it through engine.trigger to create the run in
9-
// Postgres, and read-fallback endpoints will synthesise a Run view from it
10-
// while it is still QUEUED.
5+
// decides to mollify a trigger. At this stage the call site ALSO calls
6+
// engine.trigger directly (dual-write), so this is currently an
7+
// audit/preview record. A later change makes the buffer the primary write
8+
// path: the drainer's handler reads this payload and replays it through
9+
// engine.trigger to create the run in Postgres, and read-fallback
10+
// endpoints synthesise a Run view from it while it is still QUEUED.
1111
//
12-
// CONTRACT: this shape must contain everything needed for Phase 2's
13-
// drainer-replay to reconstruct an equivalent engine.trigger call. Phase 1
14-
// emits it to logs; Phase 2 will serialise it into Redis and rebuild it on
15-
// the drain side. Keep it serialisable — no functions, no class instances.
12+
// CONTRACT: this shape must contain everything the drainer-replay needs to
13+
// reconstruct an equivalent engine.trigger call. Today it is emitted to
14+
// logs; later it is serialised into Redis and rebuilt on the drain side.
15+
// Keep it serialisable — no functions, no class instances.
1616
export type BufferedTriggerPayload = {
1717
runFriendlyId: string;
1818

apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
6868
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
6969
});
7070

71-
// Phase 1 handler: no-op ack. The trigger has ALREADY been written to
72-
// Postgres via engine.trigger (dual-write at the call site). Popping +
73-
// acking here proves the dequeue mechanism works end-to-end without
74-
// duplicating the work. Phase 2 will replace this with an engine.trigger
75-
// replay that performs the actual Postgres write.
71+
// No-op ack handler: the trigger has ALREADY been written to Postgres
72+
// via engine.trigger (dual-write at the call site). Popping + acking
73+
// here proves the dequeue mechanism works end-to-end without duplicating
74+
// the work. A later change replaces this with an engine.trigger replay
75+
// that performs the actual Postgres write.
7676
const drainer = new MollifierDrainer<BufferedTriggerPayload>({
7777
buffer,
7878
handler: async (input) => {

apps/webapp/app/v3/mollifier/mollifierGate.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export type GateDependencies = {
7373
};
7474

7575
// `options` is a thunk so env reads happen per-evaluation, not at module load.
76-
// Don't "simplify" to a plain object — Phase 2 dynamic config relies on the
76+
// Don't "simplify" to a plain object — dynamic config relies on the
7777
// gate observing whichever env values are live at trigger time.
7878
const defaultEvaluator = createRealTripEvaluator({
7979
getBuffer: () => getMollifierBuffer(),

apps/webapp/app/v3/mollifier/mollifierTripEvaluator.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ export function createRealTripEvaluator(deps: CreateRealTripEvaluatorDeps): Trip
3535
} catch (err) {
3636
// Deliberate: no error counter here. Shadow mode means a silent miss is
3737
// harmless — fail-open is the safe direction. The error log + Sentry
38-
// capture is sufficient operability for Phase 1. Revisit in Phase 2
39-
// when buffer writes are the primary path and a missed evaluation has cost.
38+
// capture is sufficient operability while this runs in shadow mode. Revisit
39+
// once buffer writes are the primary path and a missed evaluation has cost.
4040
logger.error("mollifier trip evaluator: fail-open on error", {
4141
envId: inputs.envId,
4242
err: err instanceof Error ? err.message : String(err),

apps/webapp/app/v3/mollifierDrainerWorker.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ export function initMollifierDrainerWorker(
9797
// Deterministic misconfig (shutdown-timeout vs GRACEFUL_SHUTDOWN_TIMEOUT,
9898
// missing buffer client) is a deploy-time mistake the operator must
9999
// see immediately — rethrow so the process crashes, health checks
100-
// fail, and the orchestrator rolls the deploy back. Phase 1 is
101-
// monitoring-only and the silent-fallback was tempting, but Phase 2/3
100+
// fail, and the orchestrator rolls the deploy back. The drainer is currently
101+
// monitoring-only and the silent-fallback was tempting, but later phases
102102
// make the drainer the source of truth for diverted triggers, where a
103103
// silently-disabled drainer means data loss. Better to fail loud now
104104
// than retrofit later.

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,7 +1404,7 @@ describe("RunEngineTriggerTaskService", () => {
14041404
// SCENARIO: dual-write where buffer.accept succeeds but engine.trigger
14051405
// throws. The throw propagates to the caller (correct: customer sees
14061406
// the same 4xx as today), and the buffer entry remains as an "orphan"
1407-
// — Phase 1's no-op drainer will pop+ack it on its next poll, so the
1407+
// — the no-op drainer will pop+ack it on its next poll, so the
14081408
// orphan is bounded (~drainer pollIntervalMs) but observable in the
14091409
// audit trail (mollifier.buffered with no matching TaskRun).
14101410
//
@@ -1416,11 +1416,11 @@ describe("RunEngineTriggerTaskService", () => {
14161416
// - RunOneTimeUseTokenError (Prisma P2002 on oneTimeUseToken).
14171417
// - Transient Prisma errors (FK constraint, connection drop, etc.).
14181418
//
1419-
// Why we don't "fix" this race in Phase 1:
1419+
// Why we don't "fix" this race now:
14201420
// The customer correctly gets the error. State eventually converges
14211421
// (drainer pops the orphan). The audit-trail explicitly surfaces
14221422
// "buffered without TaskRun" entries to operators. A real fix is
1423-
// Phase 2's responsibility once the buffer becomes the primary write
1423+
// a later change's responsibility once the buffer becomes the primary write
14241424
// — at that point we add the mollifier-specific idempotency index.
14251425
//
14261426
// This test pins the current ordering: buffer.accept fires synchronously
@@ -1491,7 +1491,7 @@ describe("RunEngineTriggerTaskService", () => {
14911491

14921492
// The buffer write happened BEFORE engine.trigger threw. The orphan
14931493
// remains; the audit-trail will surface it (mollifier.buffered with
1494-
// no matching TaskRun row). Phase 1's no-op drainer cleans it up.
1494+
// no matching TaskRun row). The no-op drainer cleans it up.
14951495
expect(buffer.accepted).toHaveLength(1);
14961496
const orphanPayload = JSON.parse(buffer.accepted[0]!.payload);
14971497
expect(orphanPayload.taskId).toBe(taskIdentifier);
@@ -1617,22 +1617,22 @@ describe("RunEngineTriggerTaskService", () => {
16171617
// service correctly returns the existing run id to the customer, but
16181618
// the buffer is left with an orphan entry for the new friendlyId.
16191619
//
1620-
// Why this is acceptable in Phase 1:
1620+
// Why this is acceptable now:
16211621
// - Customer-facing behaviour is unchanged from today: they receive
16221622
// the existing run id, same as the non-mollified path.
16231623
// - The orphan is bounded — the drainer's no-op-ack handler pops
16241624
// and acks it on its next poll.
16251625
// - The audit-trail surfaces it: a `mollifier.buffered` log line
16261626
// with `runId` that has no matching TaskRun in Postgres.
16271627
//
1628-
// Why Phase 2 cares:
1628+
// Why a later change cares:
16291629
// - When the buffer becomes the primary write path, debounce can
16301630
// no longer be allowed to run AFTER buffer.accept. The drainer's
16311631
// engine.trigger replay would observe "existing" and skip the
16321632
// persist — the customer's synthesised 200 (with the new
16331633
// friendlyId) would never get a TaskRun, and the audit-trail
16341634
// divergence becomes a real data-loss bug.
1635-
// - Phase 2 must lift `handleDebounce` into the call site BEFORE
1635+
// - A later change must lift `handleDebounce` into the call site BEFORE
16361636
// buffer.accept:
16371637
// 1. handleDebounce → if existing, return existing run; do NOT
16381638
// touch the buffer.

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,7 @@ describe("MollifierBuffer.accept idempotency", () => {
10741074
{ timeout: 20_000 },
10751075
async ({ redisContainer }) => {
10761076
// After ack, the entry hash persists for the grace window as a
1077-
// read-fallback safety net (Q1 D2). RunIds are server-generated and
1077+
// read-fallback safety net. RunIds are server-generated and
10781078
// never collide in practice, but defense-in-depth: accept refuses
10791079
// while *any* entry exists for the runId, including materialised
10801080
// ones. The entry hash's TTL is now ~30s instead of the original

packages/redis-worker/src/mollifier/buffer.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export type MollifierBufferOptions = {
1515

1616
// Grace TTL applied to the entry hash on drainer ack. The entry survives
1717
// this long after materialisation so direct reads (retrieve, trace, etc.)
18-
// have a safety net while PG replica lag settles. Q1 D2.
18+
// have a safety net while PG replica lag settles.
1919
const ACK_GRACE_TTL_SECONDS = 30;
2020

2121
// ioredis reconnect backoff for the mollifier buffer client. The base
@@ -80,7 +80,7 @@ export function idempotencyLookupKeyFor(input: IdempotencyLookupInput): string {
8080
}
8181

8282
// Pre-gate claim key namespace, distinct from `mollifier:idempotency` so the
83-
// existing B6a buffer-side dedup stays isolated. The claim is the
83+
// existing buffer-side dedup stays isolated. The claim is the
8484
// authoritative cross-store "this idempotency key is in flight or
8585
// resolved" pointer used by the trigger hot path. Values:
8686
// "pending:<token>" → claimed by a trigger pipeline; `<token>` is the
@@ -143,7 +143,7 @@ export class MollifierBuffer {
143143
// SETNX a Redis lookup at `mollifier:idempotency:{env}:{task}:{key}`
144144
// pointing at the runId so trigger-time dedup during the buffered
145145
// window resolves the same way PG's unique constraint resolves it
146-
// post-materialisation (Q5).
146+
// post-materialisation.
147147
idempotencyKey?: string;
148148
taskIdentifier?: string;
149149
}): Promise<AcceptResult> {
@@ -277,7 +277,7 @@ export class MollifierBuffer {
277277
// - "not_found": no entry hash exists for this runId — including a
278278
// FAILED entry, whose hash the drainer-terminal `fail` path DELs.
279279
// - "busy": entry is DRAINING or materialised. The API
280-
// wait-and-bounces through PG (Q3 design).
280+
// wait-and-bounces through PG.
281281
async mutateSnapshot(runId: string, patch: SnapshotPatch): Promise<MutateSnapshotResult> {
282282
const result = (await this.redis.mutateMollifierSnapshot(
283283
`mollifier:entries:${runId}`,
@@ -325,7 +325,7 @@ export class MollifierBuffer {
325325

326326
// Atomic pre-gate claim on a (env, task, idempotencyKey) tuple. One
327327
// call across both PG and buffer paths serialises through this claim;
328-
// closes the race the buffer-side B6a SETNX leaves open during the
328+
// closes the race the buffer-side SETNX leaves open during the
329329
// gate-transition burst window.
330330
//
331331
// The caller supplies an opaque `token` (UUID) on claim. The same token
@@ -443,8 +443,8 @@ export class MollifierBuffer {
443443
// Marks the entry as materialised (PG row written) and resets its TTL to
444444
// the grace window. Entry hash persists past ack as a read-fallback
445445
// safety net for the brief PG replica-lag window between drainer-side
446-
// write and reader-side visibility (Q1 D2). Also clears the associated
447-
// idempotency lookup if one was set on accept (Q5).
446+
// write and reader-side visibility. Also clears the associated
447+
// idempotency lookup if one was set on accept.
448448
async ack(runId: string): Promise<void> {
449449
await this.redis.ackMollifierEntry(
450450
`mollifier:entries:${runId}`,
@@ -531,7 +531,7 @@ export class MollifierBuffer {
531531
return 0
532532
end
533533
534-
-- Idempotency-key dedup (Q5). If the caller passed a lookup key
534+
-- Idempotency-key dedup. If the caller passed a lookup key
535535
-- and it's already bound to another buffered run, return the
536536
-- winner's runId so the loser's API response can echo it as a
537537
-- cached hit. Otherwise SET the lookup (no TTL — lifecycle is
@@ -606,7 +606,7 @@ export class MollifierBuffer {
606606
-- Requeue RPUSHes to the tail (the RPOP end) so a transiently
607607
-- failed entry pops next rather than going to the back of the
608608
-- line behind a fresh backlog. createdAt is immutable across
609-
-- retries (Phase 3b decision); the drainer's maxAttempts caps the
609+
-- retries; the drainer's maxAttempts caps the
610610
-- retry loop so a poisoned entry doesn't head-of-line forever.
611611
redis.call('RPUSH', queuePrefix .. envId, runId)
612612
-- Re-track the org/env: pop may have SREM'd them when the queue
@@ -920,7 +920,7 @@ export class MollifierBuffer {
920920
921921
-- If the entry was accepted with an idempotency key, the lookup
922922
-- string was stored on the hash at accept time. Clear it now —
923-
-- PG becomes canonical for the key post-materialisation (Q5).
923+
-- PG becomes canonical for the key post-materialisation.
924924
local lookupKey = redis.call('HGET', entryKey, 'idempotencyLookupKey')
925925
if lookupKey and lookupKey ~= '' then
926926
redis.call('DEL', lookupKey)

packages/redis-worker/src/mollifier/drainer.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ describe("MollifierDrainer.runOnce", () => {
8787
});
8888

8989
// After ack the entry persists as a read-fallback safety net with
90-
// materialised=true and a fresh grace TTL (Q1 D2 / Phase B2).
90+
// materialised=true and a fresh grace TTL.
9191
const entry = await buffer.getEntry("run_1");
9292
expect(entry).not.toBeNull();
9393
expect(entry!.materialised).toBe(true);
@@ -981,7 +981,7 @@ describe("MollifierDrainer additional coverage", () => {
981981
// ack() lives inside the same try as the handler call, so if the
982982
// handler succeeds but ack throws (e.g. transient Redis blip), the
983983
// entry is routed through the retry/terminal path even though the
984-
// handler-side work completed. Phase 2's engine-replay handler will
984+
// handler-side work completed. A later engine-replay handler will
985985
// need idempotency to absorb the re-execution this implies on retry,
986986
// OR ack should be lifted out of the try block.
987987
let handlerCalls = 0;

0 commit comments

Comments
 (0)