Skip to content

Commit 612de3b

Browse files
d-csclaude
andcommitted
refactor(redis-worker): mollifier queue ZSET -> LIST, drop dead watermark listing
The watermark listing (listForEnvWithWatermark) had zero consumers across the whole stack — the dashboard renders buffered runs per-run via getEntry, never a paginated queue list. With it gone, nothing left needs a sorted set: FIFO drain is just insertion order, the stale sweep enumerates without caring about order, and delay is materialised into PG at drain time (never a queue sort key). Revert the per-env queue to a Redis LIST: LPUSH on accept, RPOP on pop (FIFO), RPUSH on requeue (transiently-failed entry pops next), LLEN for the empty check, LRANGE for the stale-sweep enumeration. O(1) instead of O(log N), simpler Lua. createdAtMicros stays a hash field for dwell metrics; it is no longer a sort key. Removes listForEnvWithWatermark + its tests; rewrites the ZSET-storage tests as LIST-storage tests; updates orphan/requeue tests for RPOP/RPUSH. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8febda2 commit 612de3b

3 files changed

Lines changed: 72 additions & 332 deletions

File tree

.changeset/mollifier-buffer-extensions.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
"@trigger.dev/redis-worker": minor
33
---
44

5-
Mollifier buffer extensions: idempotency dedup, an atomic `mutateSnapshot` API, metadata CAS, paginated listing, claim primitives, and a `MollifierSnapshot` type.
5+
Mollifier buffer extensions: idempotency dedup, an atomic `mutateSnapshot` API, metadata CAS, claim primitives, and a `MollifierSnapshot` type.

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 42 additions & 228 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,8 @@ describe("MollifierBuffer.pop orphan handling", () => {
238238
});
239239

240240
try {
241-
// Simulate a TTL-expired orphan: queue ref exists, entry hash does not.
242-
await buffer["redis"].zadd("mollifier:queue:env_a", 1, "run_orphan");
241+
// Simulate an evicted orphan: queue ref exists, entry hash does not.
242+
await buffer["redis"].rpush("mollifier:queue:env_a", "run_orphan");
243243

244244
const popped = await buffer.pop("env_a");
245245
expect(popped).toBeNull();
@@ -249,7 +249,7 @@ describe("MollifierBuffer.pop orphan handling", () => {
249249
expect(Object.keys(raw)).toHaveLength(0);
250250

251251
// Queue is drained — the loop pops orphans until empty.
252-
const qLen = await buffer["redis"].zcard("mollifier:queue:env_a");
252+
const qLen = await buffer["redis"].llen("mollifier:queue:env_a");
253253
expect(qLen).toBe(0);
254254
} finally {
255255
await buffer.close();
@@ -271,20 +271,21 @@ describe("MollifierBuffer.pop orphan handling", () => {
271271
});
272272

273273
try {
274-
// Layout by score (lowest-first, since ZPOPMIN takes the min):
275-
// orphan_a (score 1) → valid (score = its createdAtMicros, large) → orphan_b (score 1e18).
276-
// First pop skips orphan_a, returns valid; orphan_b remains.
277-
await buffer["redis"].zadd("mollifier:queue:env_a", 1, "orphan_a");
274+
// Build the queue so RPOP (tail-first) yields: orphan_a, valid,
275+
// orphan_b. accept LPUSHes "valid"; RPUSH puts orphan_a at the
276+
// tail (popped first), LPUSH puts orphan_b at the head (popped
277+
// last). First pop skips orphan_a, returns valid; orphan_b remains.
278278
await buffer.accept({ runId: "valid", envId: "env_a", orgId: "org_1", payload: "{}" });
279-
await buffer["redis"].zadd("mollifier:queue:env_a", 1e18, "orphan_b");
279+
await buffer["redis"].rpush("mollifier:queue:env_a", "orphan_a");
280+
await buffer["redis"].lpush("mollifier:queue:env_a", "orphan_b");
280281

281282
const popped = await buffer.pop("env_a");
282283
expect(popped).not.toBeNull();
283284
expect(popped!.runId).toBe("valid");
284285
expect(popped!.status).toBe("DRAINING");
285286

286287
// The trailing orphan_b is still in the queue (single pop call).
287-
const remaining = await buffer["redis"].zcard("mollifier:queue:env_a");
288+
const remaining = await buffer["redis"].llen("mollifier:queue:env_a");
288289
expect(remaining).toBe(1);
289290

290291
// A second pop drains the trailing orphan_b. The queue is now
@@ -559,13 +560,14 @@ describe("MollifierBuffer.requeue on missing entry", () => {
559560

560561
describe("MollifierBuffer.requeue ordering", () => {
561562
redisTest(
562-
"requeued entry retains its original createdAt and pops next (oldest-first by createdAt)",
563+
"requeued entry pops next (RPUSH to the RPOP/tail end), preserving FIFO",
563564
{ timeout: 20_000 },
564565
async ({ redisContainer }) => {
565-
// Score == createdAtMicros; requeue does not bump the score. The
566-
// oldest entry continues to pop first across retries. `maxAttempts`
567-
// in the drainer bounds the retry loop for a persistently failing
568-
// entry (after which it goes to the `fail` path, not requeue).
566+
// LIST FIFO: accept LPUSHes at the head, pop RPOPs from the tail, so
567+
// the first-accepted entry pops first. requeue RPUSHes back to the
568+
// tail, so a transiently failed entry pops next rather than going to
569+
// the back. `maxAttempts` in the drainer bounds the retry loop for a
570+
// persistently failing entry (after which it goes to `fail`, not requeue).
569571
const buffer = new MollifierBuffer({
570572
redisOptions: {
571573
host: redisContainer.getHost(),
@@ -577,17 +579,15 @@ describe("MollifierBuffer.requeue ordering", () => {
577579

578580
try {
579581
await buffer.accept({ runId: "a", envId: "env_a", orgId: "org_1", payload: "{}" });
580-
await new Promise((r) => setTimeout(r, 2));
581582
await buffer.accept({ runId: "b", envId: "env_a", orgId: "org_1", payload: "{}" });
582-
await new Promise((r) => setTimeout(r, 2));
583583
await buffer.accept({ runId: "c", envId: "env_a", orgId: "org_1", payload: "{}" });
584584

585585
const first = await buffer.pop("env_a");
586586
expect(first!.runId).toBe("a");
587587

588588
await buffer.requeue("a");
589589

590-
// a still has the smallest createdAtMicros → pops next.
590+
// a was RPUSHed back to the tail → pops next, ahead of b and c.
591591
const next = await buffer.pop("env_a");
592592
expect(next!.runId).toBe("a");
593593
const after = await buffer.pop("env_a");
@@ -2045,9 +2045,9 @@ describe("MollifierBuffer.mutateSnapshot", () => {
20452045
);
20462046
});
20472047

2048-
describe("MollifierBuffer ZSET storage", () => {
2048+
describe("MollifierBuffer LIST storage", () => {
20492049
redisTest(
2050-
"queue key is a ZSET scored by entry's createdAtMicros",
2050+
"queue key is a LIST; createdAtMicros is a hash field, not a sort key",
20512051
{ timeout: 20_000 },
20522052
async ({ redisContainer }) => {
20532053
const buffer = new MollifierBuffer({
@@ -2062,32 +2062,29 @@ describe("MollifierBuffer ZSET storage", () => {
20622062
try {
20632063
await buffer.accept({ runId: "z1", envId: "env_z", orgId: "org_1", payload: "{}" });
20642064

2065-
// ZSET-only commands must succeed against the queue key.
2066-
const card = await buffer["redis"].zcard("mollifier:queue:env_z");
2067-
expect(card).toBe(1);
2065+
// LIST-only commands must succeed against the queue key.
2066+
const len = await buffer["redis"].llen("mollifier:queue:env_z");
2067+
expect(len).toBe(1);
2068+
const members = await buffer["redis"].lrange("mollifier:queue:env_z", 0, -1);
2069+
expect(members).toEqual(["z1"]);
20682070

2069-
const score = await buffer["redis"].zscore("mollifier:queue:env_z", "z1");
2070-
expect(score).not.toBeNull();
2071-
const scoreNum = Number(score);
2072-
expect(Number.isFinite(scoreNum)).toBe(true);
2071+
// The queue holds no score — it's not a ZSET.
2072+
await expect(buffer["redis"].zscore("mollifier:queue:env_z", "z1")).rejects.toThrow();
20732073

2074-
// Score matches the entry hash's createdAtMicros field.
2075-
const micros = await buffer["redis"].hget("mollifier:entries:z1", "createdAtMicros");
2076-
expect(micros).not.toBeNull();
2077-
expect(Number(micros)).toBe(scoreNum);
2078-
2079-
// Score is plausibly recent (within last minute as microseconds).
2074+
// createdAtMicros lives on the entry hash (for dwell metrics) and
2075+
// is plausibly recent (within the last minute, as microseconds).
2076+
const micros = Number(await buffer["redis"].hget("mollifier:entries:z1", "createdAtMicros"));
20802077
const nowMicros = Date.now() * 1000;
2081-
expect(scoreNum).toBeGreaterThan(nowMicros - 60_000_000);
2082-
expect(scoreNum).toBeLessThanOrEqual(nowMicros + 1_000_000);
2078+
expect(micros).toBeGreaterThan(nowMicros - 60_000_000);
2079+
expect(micros).toBeLessThanOrEqual(nowMicros + 1_000_000);
20832080
} finally {
20842081
await buffer.close();
20852082
}
20862083
},
20872084
);
20882085

20892086
redisTest(
2090-
"pop returns entries in ascending createdAtMicros order (FIFO by time, not by member)",
2087+
"pop returns entries in FIFO insertion order (independent of member lex order)",
20912088
{ timeout: 20_000 },
20922089
async ({ redisContainer }) => {
20932090
const buffer = new MollifierBuffer({
@@ -2100,11 +2097,10 @@ describe("MollifierBuffer ZSET storage", () => {
21002097
});
21012098

21022099
try {
2103-
// Insert runIds in reverse-lex order to prove ordering is by score, not member.
2100+
// Accept in reverse-lex order to prove ordering is by insertion
2101+
// (LPUSH head / RPOP tail), not by member value.
21042102
await buffer.accept({ runId: "zzz", envId: "env_o", orgId: "org_1", payload: "{}" });
2105-
await new Promise((r) => setTimeout(r, 5));
21062103
await buffer.accept({ runId: "mmm", envId: "env_o", orgId: "org_1", payload: "{}" });
2107-
await new Promise((r) => setTimeout(r, 5));
21082104
await buffer.accept({ runId: "aaa", envId: "env_o", orgId: "org_1", payload: "{}" });
21092105

21102106
const first = await buffer.pop("env_o");
@@ -2120,7 +2116,7 @@ describe("MollifierBuffer ZSET storage", () => {
21202116
);
21212117

21222118
redisTest(
2123-
"requeue keeps original score; createdAt is immutable across retries",
2119+
"requeue re-enqueues to the LIST; createdAt is immutable across retries",
21242120
{ timeout: 20_000 },
21252121
async ({ redisContainer }) => {
21262122
const buffer = new MollifierBuffer({
@@ -2134,24 +2130,17 @@ describe("MollifierBuffer ZSET storage", () => {
21342130

21352131
try {
21362132
await buffer.accept({ runId: "rq", envId: "env_rq", orgId: "org_1", payload: "{}" });
2137-
const originalScore = Number(
2138-
await buffer["redis"].zscore("mollifier:queue:env_rq", "rq"),
2139-
);
2140-
const originalMicros = Number(
2141-
await buffer["redis"].hget("mollifier:entries:rq", "createdAtMicros"),
2142-
);
2133+
const originalMicros = await buffer["redis"].hget("mollifier:entries:rq", "createdAtMicros");
21432134

21442135
await buffer.pop("env_rq");
2145-
await new Promise((r) => setTimeout(r, 5));
2136+
// Queue is empty after the pop.
2137+
expect(await buffer["redis"].llen("mollifier:queue:env_rq")).toBe(0);
2138+
21462139
await buffer.requeue("rq");
21472140

2148-
const newScore = Number(
2149-
await buffer["redis"].zscore("mollifier:queue:env_rq", "rq"),
2150-
);
2151-
const newMicros = Number(
2152-
await buffer["redis"].hget("mollifier:entries:rq", "createdAtMicros"),
2153-
);
2154-
expect(newScore).toBe(originalScore);
2141+
// Back on the LIST, and createdAtMicros is unchanged.
2142+
expect(await buffer["redis"].lrange("mollifier:queue:env_rq", 0, -1)).toEqual(["rq"]);
2143+
const newMicros = await buffer["redis"].hget("mollifier:entries:rq", "createdAtMicros");
21552144
expect(newMicros).toBe(originalMicros);
21562145
} finally {
21572146
await buffer.close();
@@ -2234,181 +2223,6 @@ describe("MollifierBuffer.listEntriesForEnv", () => {
22342223
});
22352224
});
22362225

2237-
describe("MollifierBuffer.listForEnvWithWatermark", () => {
2238-
// Seed a QUEUED entry, then pin its ZSET score and hash `createdAtMicros`
2239-
// to a deterministic value so ordering and the watermark cursor don't
2240-
// depend on wall-clock timing (Date.now() ties within a millisecond).
2241-
async function seed(buffer: MollifierBuffer, envId: string, runId: string, micros: number) {
2242-
await buffer.accept({ runId, envId, orgId: "org_1", payload: "{}" });
2243-
await buffer["redis"].zadd(`mollifier:queue:${envId}`, String(micros), runId);
2244-
await buffer["redis"].hset(`mollifier:entries:${runId}`, "createdAtMicros", String(micros));
2245-
}
2246-
2247-
redisTest("pageSize <= 0 returns empty without hitting redis", { timeout: 20_000 }, async ({ redisContainer }) => {
2248-
const buffer = new MollifierBuffer({
2249-
redisOptions: {
2250-
host: redisContainer.getHost(),
2251-
port: redisContainer.getPort(),
2252-
password: redisContainer.getPassword(),
2253-
},
2254-
logger: new Logger("test", "log"),
2255-
});
2256-
try {
2257-
expect(await buffer.listForEnvWithWatermark({ envId: "env_w", pageSize: 0 })).toEqual([]);
2258-
expect(await buffer.listForEnvWithWatermark({ envId: "env_w", pageSize: -3 })).toEqual([]);
2259-
} finally {
2260-
await buffer.close();
2261-
}
2262-
});
2263-
2264-
redisTest(
2265-
"page 1 returns newest-first up to pageSize without consuming entries",
2266-
{ timeout: 20_000 },
2267-
async ({ redisContainer }) => {
2268-
const buffer = new MollifierBuffer({
2269-
redisOptions: {
2270-
host: redisContainer.getHost(),
2271-
port: redisContainer.getPort(),
2272-
password: redisContainer.getPassword(),
2273-
},
2274-
logger: new Logger("test", "log"),
2275-
});
2276-
try {
2277-
await seed(buffer, "env_w", "wa", 1000);
2278-
await seed(buffer, "env_w", "wb", 2000);
2279-
await seed(buffer, "env_w", "wc", 3000);
2280-
2281-
const page = await buffer.listForEnvWithWatermark({ envId: "env_w", pageSize: 2 });
2282-
// Newest-first by createdAtMicros.
2283-
expect(page.map((e) => e.runId)).toEqual(["wc", "wb"]);
2284-
2285-
// Non-destructive: drainer still pops oldest-first.
2286-
const popped: string[] = [];
2287-
for (let i = 0; i < 3; i++) {
2288-
const e = await buffer.pop("env_w");
2289-
if (e) popped.push(e.runId);
2290-
}
2291-
expect(popped).toEqual(["wa", "wb", "wc"]);
2292-
} finally {
2293-
await buffer.close();
2294-
}
2295-
},
2296-
);
2297-
2298-
redisTest(
2299-
"page N continues strictly below the watermark score",
2300-
{ timeout: 20_000 },
2301-
async ({ redisContainer }) => {
2302-
const buffer = new MollifierBuffer({
2303-
redisOptions: {
2304-
host: redisContainer.getHost(),
2305-
port: redisContainer.getPort(),
2306-
password: redisContainer.getPassword(),
2307-
},
2308-
logger: new Logger("test", "log"),
2309-
});
2310-
try {
2311-
await seed(buffer, "env_w", "wa", 1000);
2312-
await seed(buffer, "env_w", "wb", 2000);
2313-
await seed(buffer, "env_w", "wc", 3000);
2314-
2315-
const page1 = await buffer.listForEnvWithWatermark({ envId: "env_w", pageSize: 2 });
2316-
expect(page1.map((e) => e.runId)).toEqual(["wc", "wb"]);
2317-
2318-
const last = page1[page1.length - 1]!;
2319-
const page2 = await buffer.listForEnvWithWatermark({
2320-
envId: "env_w",
2321-
pageSize: 2,
2322-
watermark: { createdAtMicros: last.createdAtMicros, runId: last.runId },
2323-
});
2324-
// Only the entry strictly below score 2000 remains; no overlap.
2325-
expect(page2.map((e) => e.runId)).toEqual(["wa"]);
2326-
} finally {
2327-
await buffer.close();
2328-
}
2329-
},
2330-
);
2331-
2332-
redisTest(
2333-
"tied-score watermark surfaces lex-smaller members on the next page without dupes",
2334-
{ timeout: 20_000 },
2335-
async ({ redisContainer }) => {
2336-
const buffer = new MollifierBuffer({
2337-
redisOptions: {
2338-
host: redisContainer.getHost(),
2339-
port: redisContainer.getPort(),
2340-
password: redisContainer.getPassword(),
2341-
},
2342-
logger: new Logger("test", "log"),
2343-
});
2344-
try {
2345-
// Three entries share one score; ZSET breaks the tie by member,
2346-
// and zrevrangebyscore returns them member-DESC: tc, tb, ta.
2347-
await seed(buffer, "env_w", "ta", 2000);
2348-
await seed(buffer, "env_w", "tb", 2000);
2349-
await seed(buffer, "env_w", "tc", 2000);
2350-
2351-
const page1 = await buffer.listForEnvWithWatermark({ envId: "env_w", pageSize: 2 });
2352-
expect(page1.map((e) => e.runId)).toEqual(["tc", "tb"]);
2353-
2354-
const last = page1[page1.length - 1]!;
2355-
const page2 = await buffer.listForEnvWithWatermark({
2356-
envId: "env_w",
2357-
pageSize: 2,
2358-
watermark: { createdAtMicros: last.createdAtMicros, runId: last.runId },
2359-
});
2360-
// Same score, lex-smaller than the "tb" anchor — and not "tb"
2361-
// itself (no duplicate across the page boundary).
2362-
expect(page2.map((e) => e.runId)).toEqual(["ta"]);
2363-
} finally {
2364-
await buffer.close();
2365-
}
2366-
},
2367-
);
2368-
2369-
redisTest(
2370-
"skips orphan queue references (entry hash gone) during listing",
2371-
{ timeout: 20_000 },
2372-
async ({ redisContainer }) => {
2373-
const buffer = new MollifierBuffer({
2374-
redisOptions: {
2375-
host: redisContainer.getHost(),
2376-
port: redisContainer.getPort(),
2377-
password: redisContainer.getPassword(),
2378-
},
2379-
logger: new Logger("test", "log"),
2380-
});
2381-
try {
2382-
await seed(buffer, "env_w", "live", 1000);
2383-
await seed(buffer, "env_w", "orphan", 2000);
2384-
// Drop the orphan's hash but leave its queue ref behind.
2385-
await buffer["redis"].del("mollifier:entries:orphan");
2386-
2387-
const page = await buffer.listForEnvWithWatermark({ envId: "env_w", pageSize: 10 });
2388-
expect(page.map((e) => e.runId)).toEqual(["live"]);
2389-
} finally {
2390-
await buffer.close();
2391-
}
2392-
},
2393-
);
2394-
2395-
redisTest("returns empty for an env with no queued entries", { timeout: 20_000 }, async ({ redisContainer }) => {
2396-
const buffer = new MollifierBuffer({
2397-
redisOptions: {
2398-
host: redisContainer.getHost(),
2399-
port: redisContainer.getPort(),
2400-
password: redisContainer.getPassword(),
2401-
},
2402-
logger: new Logger("test", "log"),
2403-
});
2404-
try {
2405-
expect(await buffer.listForEnvWithWatermark({ envId: "env_empty_w", pageSize: 10 })).toEqual([]);
2406-
} finally {
2407-
await buffer.close();
2408-
}
2409-
});
2410-
});
2411-
24122226
// Composite-key safety. The Redis-key builders concatenate
24132227
// `(envId, taskIdentifier, idempotencyKey)` with `:` separators; without
24142228
// per-segment encoding, `taskIdentifier="a:b"` and `idempotencyKey="x"`

0 commit comments

Comments
 (0)