Skip to content

Commit 906d5fa

Browse files
d-csclaude
andauthored
feat(mollifier): trigger burst smoothing — Phase 1 (monitoring) (#3614)
## Summary - Introduce the Mollifier: a Redis-backed buffer for `trigger()` API calls during traffic spikes, with a per-env trip evaluator and a drainer ack-loop. - Phase 1 is dual-write monitoring — every mollified trigger is buffered to Redis AND continues to `engine.trigger`. No customer-facing behaviour change. - Telemetry events: `mollifier.would_mollify`, `mollifier.buffered`, `mollifier.drained`, plus the `mollifier.decisions` counter. - Gated behind a feature flag (default off). ## Test plan - [x] `pnpm run test --filter @trigger.dev/redis-worker` - [x] `pnpm run test --filter webapp -- mollifier` - [x] Manual: with flag off, no behaviour change vs main - [x] Manual: with flag on + threshold lowered, observe `mollifier.buffered` + `mollifier.drained` log pairs with matching `runId` --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6c9f1f1 commit 906d5fa

29 files changed

Lines changed: 5247 additions & 0 deletions
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Add MollifierBuffer and MollifierDrainer primitives for trigger burst smoothing.
6+
7+
MollifierBuffer (`accept`, `pop`, `ack`, `requeue`, `fail`, `evaluateTrip`) is a per-env FIFO over Redis with atomic Lua transitions for status tracking. `evaluateTrip` is a sliding-window trip evaluator the webapp gate uses to detect per-env trigger bursts.
8+
9+
MollifierDrainer pops entries through a polling loop with a user-supplied handler. The loop survives transient Redis errors via capped exponential backoff (up to 5s), and per-env pop failures don't poison the rest of the batch — one env's blip is logged and counted as failed for that tick. Rotation is two-level: orgs at the top, envs within each org. The buffer maintains `mollifier:orgs` and `mollifier:org-envs:${orgId}` atomically with per-env queues, so the drainer walks orgs → envs directly without an in-memory cache. The `maxOrgsPerTick` option (default 500) caps how many orgs are scheduled per tick; for each picked org, one env is popped (rotating round-robin within the org). An org with N envs gets the same per-tick scheduling slot as an org with 1 env, so tenant-level drainage throughput is determined by org count rather than env count.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Lay the groundwork for an opt-in burst-protection layer on the trigger hot path. This release ships **monitoring only** — operators can observe per-env trigger storms via two opt-in modes, but no trigger calls are diverted or rate-limited yet (active burst smoothing follows in a later release). All new env vars are prefixed `TRIGGER_MOLLIFIER_*` and default off, so existing deployments see no behaviour change. With `TRIGGER_MOLLIFIER_SHADOW_MODE=1`, each trigger evaluates a per-env rate counter and logs `mollifier.would_mollify` when the threshold is crossed. With `TRIGGER_MOLLIFIER_ENABLED=1` plus a per-org `mollifierEnabled` flag, over-threshold triggers are also recorded in a Redis audit buffer alongside the normal `engine.trigger` call, drained by a background no-op consumer. The drainer has its own switch (`TRIGGER_MOLLIFIER_DRAINER_ENABLED`) so multi-replica deployments can pin the polling loop to a single worker service while every replica still produces into the buffer; unset, it inherits `TRIGGER_MOLLIFIER_ENABLED` so single-container self-hosters need only one flag. Drainer misconfiguration (shutdown-timeout reconciliation against `GRACEFUL_SHUTDOWN_TIMEOUT`, or `TRIGGER_MOLLIFIER_ENABLED=1` with no buffer Redis) now throws `MollifierConfigurationError` at boot and crashes the process, so the misconfig surfaces to the orchestrator instead of disappearing into a log line; transient init failures (Redis blip) are still logged-and-swallowed. Emits the `mollifier.decisions` OTel counter for per-env rate visibility.

apps/webapp/app/entry.server.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import isbot from "isbot";
66
import { renderToPipeableStream } from "react-dom/server";
77
import { PassThrough } from "stream";
88
import * as Worker from "~/services/worker.server";
9+
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
910
import { bootstrap } from "./bootstrap";
1011
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1112
import {
@@ -247,6 +248,8 @@ Worker.init().catch((error) => {
247248
logError(error);
248249
});
249250

251+
initMollifierDrainerWorker();
252+
250253
bootstrap().catch((error) => {
251254
logError(error);
252255
});

apps/webapp/app/env.server.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,47 @@ const EnvironmentSchema = z
10541054
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
10551055
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
10561056

1057+
TRIGGER_MOLLIFIER_ENABLED: z.string().default("0"),
1058+
// Separate switch for the drainer (consumer side) so it can be split
1059+
// off onto a dedicated worker service. Unset → inherits
1060+
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
1061+
// flip two switches. In multi-replica deployments, set this to "0"
1062+
// explicitly on every replica except the one dedicated drainer
1063+
// service — otherwise every replica's polling loop races for the
1064+
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
1065+
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
1066+
// no-op because the gate-side singleton refuses to construct a
1067+
// buffer when the system is off.
1068+
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
1069+
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
1070+
TRIGGER_MOLLIFIER_REDIS_HOST: z
1071+
.string()
1072+
.optional()
1073+
.transform((v) => v ?? process.env.REDIS_HOST),
1074+
TRIGGER_MOLLIFIER_REDIS_PORT: z.coerce
1075+
.number()
1076+
.optional()
1077+
.transform(
1078+
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
1079+
),
1080+
TRIGGER_MOLLIFIER_REDIS_USERNAME: z
1081+
.string()
1082+
.optional()
1083+
.transform((v) => v ?? process.env.REDIS_USERNAME),
1084+
TRIGGER_MOLLIFIER_REDIS_PASSWORD: z
1085+
.string()
1086+
.optional()
1087+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
1088+
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
1089+
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
1090+
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
1091+
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
1092+
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
1093+
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
1094+
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
1095+
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
1096+
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
1097+
10571098
BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
10581099
.number()
10591100
.int()

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,18 @@ import type {
4040
TriggerTaskRequest,
4141
TriggerTaskValidator,
4242
} from "../types";
43+
import { env } from "~/env.server";
44+
import {
45+
evaluateGate as defaultEvaluateGate,
46+
type GateOutcome,
47+
type MollifierEvaluateGate,
48+
} from "~/v3/mollifier/mollifierGate.server";
49+
import {
50+
getMollifierBuffer as defaultGetMollifierBuffer,
51+
type MollifierGetBuffer,
52+
} from "~/v3/mollifier/mollifierBuffer.server";
53+
import { buildBufferedTriggerPayload } from "~/v3/mollifier/bufferedTriggerPayload.server";
54+
import { serialiseSnapshot } from "@trigger.dev/redis-worker";
4355
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";
4456

4557
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
@@ -59,6 +71,14 @@ export class RunEngineTriggerTaskService {
5971
private readonly traceEventConcern: TraceEventConcern;
6072
private readonly triggerRacepointSystem: TriggerRacepointSystem;
6173
private readonly metadataMaximumSize: number;
74+
// Mollifier hooks are DI'd so tests can drive the call-site's mollify branch
75+
// deterministically (stub the gate to return mollify, inject a real or fake
76+
// buffer, force the global-enabled predicate to true so the call site
77+
// doesn't short-circuit on an unset env). In production all three default
78+
// to the live module-level singletons + env read.
79+
private readonly evaluateGate: MollifierEvaluateGate;
80+
private readonly getMollifierBuffer: MollifierGetBuffer;
81+
private readonly isMollifierGloballyEnabled: () => boolean;
6282

6383
constructor(opts: {
6484
prisma: PrismaClientOrTransaction;
@@ -71,6 +91,9 @@ export class RunEngineTriggerTaskService {
7191
tracer: Tracer;
7292
metadataMaximumSize: number;
7393
triggerRacepointSystem?: TriggerRacepointSystem;
94+
evaluateGate?: MollifierEvaluateGate;
95+
getMollifierBuffer?: MollifierGetBuffer;
96+
isMollifierGloballyEnabled?: () => boolean;
7497
}) {
7598
this.prisma = opts.prisma;
7699
this.engine = opts.engine;
@@ -82,6 +105,10 @@ export class RunEngineTriggerTaskService {
82105
this.traceEventConcern = opts.traceEventConcern;
83106
this.metadataMaximumSize = opts.metadataMaximumSize;
84107
this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem();
108+
this.evaluateGate = opts.evaluateGate ?? defaultEvaluateGate;
109+
this.getMollifierBuffer = opts.getMollifierBuffer ?? defaultGetMollifierBuffer;
110+
this.isMollifierGloballyEnabled =
111+
opts.isMollifierGloballyEnabled ?? (() => env.TRIGGER_MOLLIFIER_ENABLED === "1");
85112
}
86113

87114
public async call({
@@ -316,6 +343,25 @@ export class RunEngineTriggerTaskService {
316343
taskKind: taskKind ?? "STANDARD",
317344
};
318345

346+
// Short-circuit before the gate when mollifier is globally off (the
347+
// default for every deployment that hasn't opted in). Avoids the
348+
// GateInputs allocation, the deps spread inside `evaluateGate`, and
349+
// the `mollifier.decisions{outcome=pass_through}` OTel increment on
350+
// every trigger — `triggerTask` is the highest-throughput code path
351+
// in the system. The check goes through a DI'd predicate so unit
352+
// tests that inject a custom `evaluateGate` can also override the
353+
// gate-on check (the default reads `env.TRIGGER_MOLLIFIER_ENABLED`,
354+
// which is "0" in CI where no .env file is present).
355+
const mollifierOutcome: GateOutcome | null = this.isMollifierGloballyEnabled()
356+
? await this.evaluateGate({
357+
envId: environment.id,
358+
orgId: environment.organizationId,
359+
taskId,
360+
orgFeatureFlags:
361+
(environment.organization.featureFlags as Record<string, unknown> | null) ?? null,
362+
})
363+
: null;
364+
319365
try {
320366
return await this.traceEventConcern.traceRun(
321367
triggerRequest,
@@ -328,6 +374,74 @@ export class RunEngineTriggerTaskService {
328374

329375
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
330376

377+
// Phase 1 dual-write: if the org has the mollifier feature flag
378+
// enabled and the per-env trip evaluator says divert, write the
379+
// canonical replay payload to the buffer AND continue through
380+
// engine.trigger as normal. The buffer entry is an audit/preview
381+
// copy; the drainer's no-op handler consumes it to prove the
382+
// dequeue mechanism works. Phase 2 will replace engine.trigger
383+
// (below) with a synthesised 200 response and rely on the
384+
// drainer to perform the Postgres write via replay.
385+
if (mollifierOutcome?.action === "mollify") {
386+
const buffer = this.getMollifierBuffer();
387+
if (buffer) {
388+
const canonicalPayload = buildBufferedTriggerPayload({
389+
runFriendlyId,
390+
taskId,
391+
envId: environment.id,
392+
envType: environment.type,
393+
envSlug: environment.slug,
394+
orgId: environment.organizationId,
395+
orgSlug: environment.organization.slug,
396+
projectId: environment.projectId,
397+
projectRef: environment.project.externalRef,
398+
body,
399+
idempotencyKey: idempotencyKey ?? null,
400+
idempotencyKeyExpiresAt: idempotencyKey
401+
? idempotencyKeyExpiresAt ?? null
402+
: null,
403+
tags,
404+
parentRunFriendlyId: parentRun?.friendlyId ?? null,
405+
traceContext: event.traceContext,
406+
triggerSource,
407+
triggerAction,
408+
serviceOptions: options,
409+
createdAt: new Date(),
410+
});
411+
412+
try {
413+
const serialisedPayload = serialiseSnapshot(canonicalPayload);
414+
await buffer.accept({
415+
runId: runFriendlyId,
416+
envId: environment.id,
417+
orgId: environment.organizationId,
418+
payload: serialisedPayload,
419+
});
420+
// Light log on the hot path — keep this synchronous work
421+
// O(1) per trigger. The drainer computes the payload hash
422+
// off-path; operators correlate `mollifier.buffered` →
423+
// `mollifier.drained` by runId.
424+
logger.debug("mollifier.buffered", {
425+
runId: runFriendlyId,
426+
envId: environment.id,
427+
orgId: environment.organizationId,
428+
taskId,
429+
payloadBytes: serialisedPayload.length,
430+
});
431+
} catch (err) {
432+
// Fail-open: buffer write must never block the customer's
433+
// trigger. engine.trigger below is the primary write path
434+
// in Phase 1 — the customer still gets a valid run.
435+
logger.error("mollifier.buffer_accept_failed", {
436+
runId: runFriendlyId,
437+
envId: environment.id,
438+
taskId,
439+
err: err instanceof Error ? err.message : String(err),
440+
});
441+
}
442+
}
443+
}
444+
331445
const taskRun = await this.engine.trigger(
332446
{
333447
friendlyId: runFriendlyId,

apps/webapp/app/services/worker.server.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,24 @@
1+
/**
2+
* ⚠️ LEGACY — Graphile-worker / ZodWorker setup. Do not touch.
3+
*
4+
* This file wires the original background-job system the webapp was
5+
* built on (`@internal/zod-worker` → graphile-worker → Postgres). It is
6+
* now in deprecation mode: every task in `workerCatalog` below is
7+
* annotated with `@deprecated, moved to <new home>` and the live jobs
8+
* for new features all run on `@trigger.dev/redis-worker` instead.
9+
*
10+
* Where to put new things:
11+
* - Background jobs / queues → use redis-worker, alongside
12+
* `~/v3/commonWorker.server.ts`, `~/v3/alertsWorker.server.ts`, or
13+
* `~/v3/batchTriggerWorker.server.ts`.
14+
* - Run lifecycle → `@internal/run-engine` via `~/v3/runEngine.server`.
15+
* - Custom polling loops with their own Redis connection → keep them
16+
* in their own lifecycle module (e.g. `~/v3/mollifierDrainerWorker.server.ts`)
17+
* and wire the bootstrap from `entry.server.tsx`. Don't reach into
18+
* `init()` below.
19+
*
20+
* Edit only when removing legacy paths.
21+
*/
122
import { ZodWorker } from "@internal/zod-worker";
223
import { DeliverEmailSchema } from "emails";
324
import { z } from "zod";

apps/webapp/app/v3/featureFlags.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export const FEATURE_FLAG = {
88
hasAiAccess: "hasAiAccess",
99
hasComputeAccess: "hasComputeAccess",
1010
hasPrivateConnections: "hasPrivateConnections",
11+
mollifierEnabled: "mollifierEnabled",
1112
} as const;
1213

1314
export const FeatureFlagCatalog = {
@@ -18,6 +19,7 @@ export const FeatureFlagCatalog = {
1819
[FEATURE_FLAG.hasAiAccess]: z.coerce.boolean(),
1920
[FEATURE_FLAG.hasComputeAccess]: z.coerce.boolean(),
2021
[FEATURE_FLAG.hasPrivateConnections]: z.coerce.boolean(),
22+
[FEATURE_FLAG.mollifierEnabled]: z.coerce.boolean(),
2123
};
2224

2325
export type FeatureFlagKey = keyof typeof FeatureFlagCatalog;
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import type { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
2+
import type { TriggerTaskServiceOptions } from "~/v3/services/triggerTask.server";
3+
4+
// Canonical payload shape written to the mollifier buffer when the gate
5+
// decides to mollify a trigger. Phase 1 ALSO calls engine.trigger directly
6+
// (dual-write) so this is currently an audit/preview record. Phase 2 will
7+
// make the buffer the primary write path: the drainer's handler will read
8+
// this payload and replay it through engine.trigger to create the run in
9+
// Postgres, and read-fallback endpoints will synthesise a Run view from it
10+
// while it is still QUEUED.
11+
//
12+
// CONTRACT: this shape must contain everything needed for Phase 2's
13+
// drainer-replay to reconstruct an equivalent engine.trigger call. Phase 1
14+
// emits it to logs; Phase 2 will serialise it into Redis and rebuild it on
15+
// the drain side. Keep it serialisable — no functions, no class instances.
16+
export type BufferedTriggerPayload = {
17+
runFriendlyId: string;
18+
19+
// Routing identifiers — let the drainer re-fetch full AuthenticatedEnvironment
20+
// at replay time rather than embedding it in the payload.
21+
envId: string;
22+
envType: string;
23+
envSlug: string;
24+
orgId: string;
25+
orgSlug: string;
26+
projectId: string;
27+
projectRef: string;
28+
29+
// Task identifier — looked up against the locked BackgroundWorkerTask
30+
// at replay time to recover task-defaults.
31+
taskId: string;
32+
33+
// Customer-supplied trigger body (payload, options, context).
34+
body: TriggerTaskRequestBody;
35+
36+
// Resolved values from upstream concerns. The drainer should NOT re-resolve
37+
// these — that would create a second idempotency-key check, etc.
38+
idempotencyKey: string | null;
39+
idempotencyKeyExpiresAt: string | null;
40+
tags: string[];
41+
42+
// Parent/root linkage for nested triggers.
43+
parentRunFriendlyId: string | null;
44+
45+
// Trace context — propagates the original triggering span across the
46+
// buffer→drain boundary so the run's lifecycle stays under one trace.
47+
traceContext: Record<string, unknown>;
48+
49+
// Annotations + service options that influence routing/replay.
50+
triggerSource: string;
51+
triggerAction: string;
52+
serviceOptions: TriggerTaskServiceOptions;
53+
54+
// Wall-clock instants relevant to the run.
55+
createdAt: string;
56+
};
57+
58+
// Assemble the canonical payload from the inputs available at the point
59+
// `evaluateGate` returns "mollify" in `RunEngineTriggerTaskService.call`.
60+
// All fields must be derivable from data already in scope at that call site;
61+
// nothing should require an extra DB lookup.
62+
export function buildBufferedTriggerPayload(input: {
63+
runFriendlyId: string;
64+
taskId: string;
65+
envId: string;
66+
envType: string;
67+
envSlug: string;
68+
orgId: string;
69+
orgSlug: string;
70+
projectId: string;
71+
projectRef: string;
72+
body: TriggerTaskRequestBody;
73+
idempotencyKey: string | null;
74+
idempotencyKeyExpiresAt: Date | null;
75+
tags: string[];
76+
parentRunFriendlyId: string | null;
77+
traceContext: Record<string, unknown>;
78+
triggerSource: string;
79+
triggerAction: string;
80+
serviceOptions: TriggerTaskServiceOptions;
81+
createdAt: Date;
82+
}): BufferedTriggerPayload {
83+
return {
84+
runFriendlyId: input.runFriendlyId,
85+
envId: input.envId,
86+
envType: input.envType,
87+
envSlug: input.envSlug,
88+
orgId: input.orgId,
89+
orgSlug: input.orgSlug,
90+
projectId: input.projectId,
91+
projectRef: input.projectRef,
92+
taskId: input.taskId,
93+
body: input.body,
94+
idempotencyKey: input.idempotencyKey,
95+
idempotencyKeyExpiresAt:
96+
input.idempotencyKey && input.idempotencyKeyExpiresAt
97+
? input.idempotencyKeyExpiresAt.toISOString()
98+
: null,
99+
tags: input.tags,
100+
parentRunFriendlyId: input.parentRunFriendlyId,
101+
traceContext: input.traceContext,
102+
triggerSource: input.triggerSource,
103+
triggerAction: input.triggerAction,
104+
serviceOptions: input.serviceOptions,
105+
createdAt: input.createdAt.toISOString(),
106+
};
107+
}

0 commit comments

Comments
 (0)