Skip to content

Commit 51c982f

Browse files
d-csclaude
andcommitted
perf(webapp): shard the mollifier stale sweep to bound per-tick work
The previous sweep was unbounded along two dimensions: every tick walked every org and every env (via buffer.listOrgs + listEnvsForOrg). At the sweep's default per-env entry cap of 1000, an incident-scale fan-out gave O(orgs * envs * 1000) Redis round-trips per tick — running far longer than the 5-minute interval and triggering the inFlight guard to drop every subsequent tick until the slow pass finished. Shard the work via a durable cursor: - New file `mollifierStaleSweepState.server.ts` owns three Redis keys (`mollifier:stale_sweep:{cursor,org_list,counts}`), all under the mollifier namespace but separated from the buffer's own state. The state class has its own Redis client; the buffer's existing `MollifierBuffer` API surface is untouched. - On `cursor === 0` the org list is rebuilt by snapshotting `buffer.listOrgs()` into the frozen LIST — the cycle's frozen view. - Each tick consumes up to `maxOrgsPerPass` orgs (default 100), processes them, and advances the cursor. When the cursor reaches the end of the LIST it wraps to 0; the next tick rebuilds and starts the next cycle. - The per-env counts HASH is the source of truth for the gauge snapshot. Visiting an env with zero stale entries HDEL's its hash field — gauge clears immediately on revisit. Envs not revisited this tick keep their last-known value (durability across ticks AND across webapp restarts), accepting a worst-case lag of one full cursor cycle before a no-longer-stale env clears. Snapshot contract change: only envs with non-zero stale counts appear in the reported `Map`. The telemetry layer (`mollifierTelemetry.server.ts` `reportStaleEntrySnapshot`) sums values, so absence is equivalent to zero for the gauge — the alert behaviour is unchanged. Tests: - New: "shards work across ticks: cursor advances by maxOrgsPerPass and wraps after a full cycle" — drives a 5-org fixture with cap=2, asserts the cursor's three-tick progression and wrap. - New: "clears an env from the durable snapshot on revisit when it has entries but none currently stale" — same entry flips stale→not-stale by varying the sweep's `now`, asserts HDEL on revisit. - Existing tests updated to inject `state`; one assertion shape rewritten ("snapshot omits envs that have entries but none stale") to match the new HDEL semantics. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b7de986 commit 51c982f

4 files changed

Lines changed: 446 additions & 66 deletions

File tree

apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts

Lines changed: 91 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
22
import { logger as defaultLogger } from "~/services/logger.server";
33
import { getMollifierBuffer } from "./mollifierBuffer.server";
4+
import { MollifierStaleSweepState, type StaleSweepStateStore } from "./mollifierStaleSweepState.server";
45
import {
56
recordStaleEntry as defaultRecordStaleEntry,
67
reportStaleEntrySnapshot as defaultReportStaleEntrySnapshot,
78
} from "./mollifierTelemetry.server";
89

9-
// One pass of the sweep scans every env's queue LIST. The per-env page
10-
// is bounded so a single pathological env can't make the sweep run
11-
// unboundedly long.
10+
// One pass of the sweep scans a bounded slice of orgs from the buffer's
11+
// queue LIST, identified by a durable cursor in Redis. Per-env entry
12+
// scan is also bounded so a single pathological env can't extend the
13+
// pass.
1214
const DEFAULT_MAX_ENTRIES_PER_ENV = 1000;
15+
// Max orgs visited per tick. Together with `maxEntriesPerEnv` this
16+
// caps Redis traffic per pass. One "cycle" (visiting every org once)
17+
// takes `ceil(N_orgs / cap)` ticks, after which the cursor wraps and a
18+
// fresh org list is taken.
19+
const DEFAULT_MAX_ORGS_PER_PASS = 100;
1320

1421
export type StaleSweepConfig = {
1522
// Entries whose dwell exceeds this threshold are flagged stale. Set
@@ -18,10 +25,21 @@ export type StaleSweepConfig = {
1825
// matches the cadence in the plan doc.
1926
staleThresholdMs: number;
2027
maxEntriesPerEnv?: number;
28+
// Hard cap on orgs visited per tick. Bounds the per-pass Redis traffic
29+
// and wall-time. Default 100 — at typical fleet sizes one or two
30+
// ticks cover everyone; under incident-scale fan-out a full cycle
31+
// takes a handful of ticks (~minutes) which is still well below the
32+
// staleness signal latency that ops cares about.
33+
maxOrgsPerPass?: number;
2134
};
2235

2336
export type StaleSweepDeps = {
2437
getBuffer?: () => MollifierBuffer | null;
38+
// Durable cursor + per-env counts hash. Required: the sweep is
39+
// useless without persistent state across ticks. The webapp wires up
40+
// a real `MollifierStaleSweepState`; tests pass one constructed
41+
// against the test container.
42+
state: StaleSweepStateStore;
2543
// No `envId` arg — `envId` is a high-cardinality metric attribute and
2644
// is intentionally not emitted as a metric label. The structured warn
2745
// log below carries envId for forensic drill-down.
@@ -38,15 +56,32 @@ export type StaleSweepResult = {
3856
staleCount: number;
3957
};
4058

41-
// Walks orgs → envs → entries, emitting an OTel counter tick and a
42-
// structured warning log for each buffer entry whose dwell exceeds the
43-
// stale threshold. Read-only: the sweep does NOT remove or salvage
44-
// entries; that decision is deferred to a separate retention-policy
45-
// change. The signal here exists so ops sees the drainer falling
59+
// Walks a bounded slice of `orgs → envs → entries`, emitting an OTel
60+
// counter tick and a structured warning log for each buffer entry whose
61+
// dwell exceeds the stale threshold. Read-only on the buffer's own
62+
// state; writes only to the sweep's three dedicated keys
63+
// (`mollifier:stale_sweep:*`). The sweep does NOT remove or salvage
64+
// buffer entries; that decision is deferred to a separate retention-
65+
// policy change. The signal here exists so ops sees the drainer falling
4666
// behind well before TTL-induced loss kicks in.
67+
//
68+
// Sharding contract:
69+
// - Cursor starts at 0. On cursor=0 the org list is refreshed by
70+
// snapshotting `buffer.listOrgs()` into the durable LIST — that is
71+
// the cycle's frozen view of orgs to visit.
72+
// - Each tick consumes up to `maxOrgsPerPass` orgs from the LIST,
73+
// advances the cursor, and persists.
74+
// - When the cursor reaches the end of the LIST it wraps to 0; the next
75+
// tick rebuilds the org list, capturing any orgs that joined the
76+
// buffer mid-cycle.
77+
// - The per-env counts HASH carries over across ticks: an env visited
78+
// on tick N and not revisited until tick N+M keeps its last-known
79+
// stale count in the gauge for that window. This is the price of
80+
// sharding — accepted because the alternative (re-scan everything
81+
// every tick) does not bound work.
4782
export async function runStaleSweepOnce(
4883
config: StaleSweepConfig,
49-
deps: StaleSweepDeps = {},
84+
deps: StaleSweepDeps,
5085
): Promise<StaleSweepResult> {
5186
const getBuffer = deps.getBuffer ?? getMollifierBuffer;
5287
const recordStale = deps.recordStaleEntry ?? defaultRecordStaleEntry;
@@ -55,27 +90,40 @@ export async function runStaleSweepOnce(
5590
const log = deps.logger ?? defaultLogger;
5691
const now = (deps.now ?? Date.now)();
5792
const maxEntries = config.maxEntriesPerEnv ?? DEFAULT_MAX_ENTRIES_PER_ENV;
93+
const maxOrgsPerPass = config.maxOrgsPerPass ?? DEFAULT_MAX_ORGS_PER_PASS;
5894

5995
const buffer = getBuffer();
6096
if (!buffer) {
6197
// Replace any previous snapshot with empty so a previously-paging
6298
// env doesn't stay latched if mollifier is turned off mid-flight.
99+
// Also clear the durable state so a re-enable starts from a clean
100+
// slate instead of resuming on a stale cursor.
101+
await deps.state.clearAll();
63102
reportSnapshot(new Map());
64103
return { orgsScanned: 0, envsScanned: 0, entriesScanned: 0, staleCount: 0 };
65104
}
66105

67-
const orgs = await buffer.listOrgs();
106+
let cursor = await deps.state.readCursor();
107+
if (cursor === 0) {
108+
// Fresh cycle — capture the current set of orgs into the frozen
109+
// LIST. Any orgs that join after this snapshot wait until the next
110+
// cycle to be visited. Acceptable for an observational sweep; the
111+
// staleness signal would only fire on entries that have been
112+
// dwelling for `staleThresholdMs` anyway, so they're not new.
113+
const orgs = await buffer.listOrgs();
114+
await deps.state.rebuildOrgList(orgs);
115+
}
116+
117+
const { orgs: slice, total } = await deps.state.readOrgListSlice(
118+
cursor,
119+
maxOrgsPerPass,
120+
);
121+
68122
let envsScanned = 0;
69123
let entriesScanned = 0;
70124
let staleCount = 0;
71-
// Tracks the stale count per env this pass. Includes zero counts for
72-
// envs that have entries but none stale — that's what lets the gauge
73-
// drop back to 0 when the drainer catches up. Envs absent from this
74-
// map are also absent from the new snapshot, clearing any latched
75-
// alerts on envs that have fully drained.
76-
const perEnvStale = new Map<string, number>();
77-
78-
for (const orgId of orgs) {
125+
126+
for (const orgId of slice) {
79127
const envs = await buffer.listEnvsForOrg(orgId);
80128
for (const envId of envs) {
81129
envsScanned += 1;
@@ -96,18 +144,31 @@ export async function runStaleSweepOnce(
96144
envStale += 1;
97145
}
98146
}
99-
perEnvStale.set(envId, envStale);
147+
// Persist the per-env count to the durable hash. HSET when stale
148+
// > 0, HDEL when it dropped back to zero — the hash is the source
149+
// of truth for the gauge snapshot below.
150+
await deps.state.setEnvStaleCount(envId, envStale);
100151
staleCount += envStale;
101152
}
102153
}
103154

104-
reportSnapshot(perEnvStale);
155+
// Advance the cursor. If the slice consumed the end of the LIST, wrap
156+
// to 0 so the next tick rebuilds the org list and starts a new cycle.
157+
const advanced = cursor + slice.length;
158+
const newCursor = advanced >= total ? 0 : advanced;
159+
await deps.state.writeCursor(newCursor);
160+
161+
// Emit the snapshot from the durable hash, which carries values for
162+
// envs visited in earlier ticks too. This is what makes the gauge
163+
// stable across ticks (and across webapp restarts).
164+
const snapshot = await deps.state.readAllEnvStaleCounts();
165+
reportSnapshot(snapshot);
105166

106-
return { orgsScanned: orgs.length, envsScanned, entriesScanned, staleCount };
167+
return { orgsScanned: slice.length, envsScanned, entriesScanned, staleCount };
107168
}
108169

109170
export type StaleSweepIntervalHandle = {
110-
stop: () => void;
171+
stop: () => Promise<void>;
111172
};
112173

113174
// Production wrapper: schedule `runStaleSweepOnce` on a fixed interval.
@@ -116,7 +177,7 @@ export type StaleSweepIntervalHandle = {
116177
// overlapping sweeps that all log the same stale entries).
117178
export function startStaleSweepInterval(
118179
config: StaleSweepConfig & { intervalMs: number },
119-
deps: StaleSweepDeps = {},
180+
deps: StaleSweepDeps,
120181
): StaleSweepIntervalHandle {
121182
let stopped = false;
122183
let inFlight = false;
@@ -141,9 +202,15 @@ export function startStaleSweepInterval(
141202
}, config.intervalMs);
142203

143204
return {
144-
stop: () => {
205+
stop: async () => {
145206
stopped = true;
146207
clearInterval(timer);
208+
// Close the durable-state Redis client if the deps own a real
209+
// `MollifierStaleSweepState`. Tests may inject a fake without a
210+
// `close()`; guard accordingly.
211+
if (deps.state instanceof MollifierStaleSweepState) {
212+
await deps.state.close();
213+
}
147214
},
148215
};
149216
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis";
2+
import { Logger } from "@trigger.dev/core/logger";
3+
4+
// Durable per-tick state for the sharded stale sweep. Three Redis keys,
5+
// all in the `mollifier:` namespace alongside the buffer's own state:
6+
//
7+
// mollifier:stale_sweep:cursor STRING next position in org_list (0 = fresh cycle)
8+
// mollifier:stale_sweep:org_list LIST org IDs frozen at the start of the cycle
9+
// mollifier:stale_sweep:counts HASH envId -> last-known stale count
10+
//
11+
// The state survives webapp restarts: a restarted process picks up the
12+
// cursor where the previous one left off and re-emits the last-known
13+
// gauge values immediately, rather than blinking to zero until the next
14+
// cycle visits each env.
15+
//
16+
// Storage is owned by this class rather than added to MollifierBuffer
17+
// because the keys are sweep-internal — the buffer abstracts the
18+
// drainer/queue state, this abstracts sweep state. They share a
19+
// namespace prefix but no API surface.
20+
21+
export interface StaleSweepStateStore {
22+
readCursor(): Promise<number>;
23+
writeCursor(value: number): Promise<void>;
24+
/** Replaces the cycle's frozen org_list. Called at cursor=0. */
25+
rebuildOrgList(orgs: string[]): Promise<void>;
26+
/** Returns up to `count` org IDs starting at `start`, plus the LIST's total length. */
27+
readOrgListSlice(start: number, count: number): Promise<{ orgs: string[]; total: number }>;
28+
/** HSET when count > 0, HDEL when count === 0 (so the snapshot reflects current truth). */
29+
setEnvStaleCount(envId: string, count: number): Promise<void>;
30+
readAllEnvStaleCounts(): Promise<Map<string, number>>;
31+
clearAll(): Promise<void>;
32+
close(): Promise<void>;
33+
}
34+
35+
const CURSOR_KEY = "mollifier:stale_sweep:cursor";
36+
const ORG_LIST_KEY = "mollifier:stale_sweep:org_list";
37+
const COUNTS_KEY = "mollifier:stale_sweep:counts";
38+
39+
export class MollifierStaleSweepState implements StaleSweepStateStore {
40+
private readonly redis: Redis;
41+
private readonly logger: Logger;
42+
43+
constructor(options: { redisOptions: RedisOptions; logger?: Logger }) {
44+
this.logger = options.logger ?? new Logger("MollifierStaleSweepState", "debug");
45+
this.redis = createRedisClient(
46+
{ ...options.redisOptions, maxRetriesPerRequest: 20 },
47+
{
48+
onError: (error) => {
49+
this.logger.error("MollifierStaleSweepState redis client error:", { error });
50+
},
51+
},
52+
);
53+
}
54+
55+
async readCursor(): Promise<number> {
56+
const raw = await this.redis.get(CURSOR_KEY);
57+
if (raw === null) return 0;
58+
const n = Number.parseInt(raw, 10);
59+
return Number.isFinite(n) && n >= 0 ? n : 0;
60+
}
61+
62+
async writeCursor(value: number): Promise<void> {
63+
await this.redis.set(CURSOR_KEY, String(value));
64+
}
65+
66+
async rebuildOrgList(orgs: string[]): Promise<void> {
67+
// DEL + RPUSH in a pipeline — close enough to atomic for an
68+
// observational sweep (the inFlight guard at startStaleSweepInterval
69+
// serialises sweep passes; nothing else writes these keys).
70+
const pipeline = this.redis.pipeline();
71+
pipeline.del(ORG_LIST_KEY);
72+
if (orgs.length > 0) {
73+
pipeline.rpush(ORG_LIST_KEY, ...orgs);
74+
}
75+
await pipeline.exec();
76+
}
77+
78+
async readOrgListSlice(
79+
start: number,
80+
count: number,
81+
): Promise<{ orgs: string[]; total: number }> {
82+
const pipeline = this.redis.pipeline();
83+
pipeline.lrange(ORG_LIST_KEY, start, start + count - 1);
84+
pipeline.llen(ORG_LIST_KEY);
85+
const results = await pipeline.exec();
86+
if (!results) return { orgs: [], total: 0 };
87+
const [lrangeErr, lrangeRes] = results[0] as [Error | null, string[] | null];
88+
const [llenErr, llenRes] = results[1] as [Error | null, number | null];
89+
if (lrangeErr || llenErr) {
90+
this.logger.error("MollifierStaleSweepState.readOrgListSlice failed", {
91+
lrangeErr: lrangeErr?.message,
92+
llenErr: llenErr?.message,
93+
});
94+
return { orgs: [], total: 0 };
95+
}
96+
return { orgs: lrangeRes ?? [], total: llenRes ?? 0 };
97+
}
98+
99+
async setEnvStaleCount(envId: string, count: number): Promise<void> {
100+
if (count > 0) {
101+
await this.redis.hset(COUNTS_KEY, envId, String(count));
102+
} else {
103+
await this.redis.hdel(COUNTS_KEY, envId);
104+
}
105+
}
106+
107+
async readAllEnvStaleCounts(): Promise<Map<string, number>> {
108+
const raw = await this.redis.hgetall(COUNTS_KEY);
109+
const out = new Map<string, number>();
110+
for (const [envId, value] of Object.entries(raw)) {
111+
const n = Number.parseInt(value, 10);
112+
if (Number.isFinite(n)) out.set(envId, n);
113+
}
114+
return out;
115+
}
116+
117+
async clearAll(): Promise<void> {
118+
await this.redis.del(CURSOR_KEY, ORG_LIST_KEY, COUNTS_KEY);
119+
}
120+
121+
async close(): Promise<void> {
122+
await this.redis.quit();
123+
}
124+
}

apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
startStaleSweepInterval,
66
type StaleSweepIntervalHandle,
77
} from "./mollifier/mollifierStaleSweep.server";
8+
import { MollifierStaleSweepState } from "./mollifier/mollifierStaleSweepState.server";
89

910
declare global {
1011
// eslint-disable-next-line no-var
@@ -35,13 +36,38 @@ export function initMollifierStaleSweepWorker(): void {
3536
staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS,
3637
});
3738

38-
const handle = startStaleSweepInterval({
39-
intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS,
40-
staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS,
39+
// Construct the sweep's durable-state Redis client using the same
40+
// mollifier-Redis credentials as the buffer. Keeping this client
41+
// separate from the buffer's own client keeps state ownership clean:
42+
// the buffer abstracts queue/entry state, this abstracts sweep state.
43+
const state = new MollifierStaleSweepState({
44+
redisOptions: {
45+
keyPrefix: "",
46+
host: env.TRIGGER_MOLLIFIER_REDIS_HOST,
47+
port: env.TRIGGER_MOLLIFIER_REDIS_PORT,
48+
username: env.TRIGGER_MOLLIFIER_REDIS_USERNAME,
49+
password: env.TRIGGER_MOLLIFIER_REDIS_PASSWORD,
50+
enableAutoPipelining: true,
51+
...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
52+
},
4153
});
4254

43-
signalsEmitter.on("SIGTERM", handle.stop);
44-
signalsEmitter.on("SIGINT", handle.stop);
55+
const handle = startStaleSweepInterval(
56+
{
57+
intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS,
58+
staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS,
59+
},
60+
{ state },
61+
);
62+
63+
// `handle.stop` is now async (it closes the Redis client). The signals
64+
// emitter swallows promise rejections from listeners, so wrap it in a
65+
// void-returning shim to be explicit about discarding the promise.
66+
const onShutdown = (): void => {
67+
void handle.stop();
68+
};
69+
signalsEmitter.on("SIGTERM", onShutdown);
70+
signalsEmitter.on("SIGINT", onShutdown);
4571
global.__mollifierStaleSweepRegistered__ = true;
4672
global.__mollifierStaleSweepHandle__ = handle;
4773
}

0 commit comments

Comments
 (0)