Skip to content

Commit d55d1dd

Browse files
d-csclaude
andcommitted
fix(redis-worker): jittered reconnect backoff for mollifier buffer client
The buffer client's ioredis retryStrategy used a fixed Math.min(times*50, 1000) schedule with no jitter, so a fleet of webapp instances reconnecting after the same Redis blip retried in lockstep and stampeded Redis on recovery (thundering herd) — the same lockstep-load pattern the mutate-fallback wait loop was changed to avoid. Extract mollifierReconnectDelayMs(times, random) and apply equal jitter: a uniform pick in [base/2, base]. Bounded by the original 1s cap, so it is never slower than before, just decorrelated. Pure unit tests pin the band, the cap, and that draws spread. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8bfa741 commit d55d1dd

3 files changed

Lines changed: 66 additions & 4 deletions

File tree

.changeset/mollifier-buffer-extensions.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
"@trigger.dev/redis-worker": minor
33
---
44

5-
Mollifier buffer extensions: idempotency dedup, an atomic `mutateSnapshot` API, metadata CAS, claim primitives, and a `MollifierSnapshot` type.
5+
Mollifier buffer extensions: idempotency dedup, an atomic `mutateSnapshot` API, metadata CAS, claim primitives, and a `MollifierSnapshot` type. The buffer's Redis client now reconnects with jittered backoff so a fleet of clients doesn't stampede Redis in lockstep after a blip.

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,52 @@ import { describe, expect, it } from "vitest";
22
import { BufferEntrySchema, serialiseSnapshot, deserialiseSnapshot } from "./schemas.js";
33
import { redisTest } from "@internal/testcontainers";
44
import { Logger } from "@trigger.dev/core/logger";
5-
import { MollifierBuffer, idempotencyLookupKeyFor, makeIdempotencyClaimKey } from "./buffer.js";
5+
import {
6+
MollifierBuffer,
7+
idempotencyLookupKeyFor,
8+
makeIdempotencyClaimKey,
9+
mollifierReconnectDelayMs,
10+
} from "./buffer.js";
11+
12+
describe("mollifierReconnectDelayMs", () => {
13+
it("grows linearly with the attempt count and caps the base at 1s", () => {
14+
// random=()=>1 yields the top of the equal-jitter band (== base).
15+
const top = (times: number) => mollifierReconnectDelayMs(times, () => 1);
16+
expect(top(1)).toBe(50);
17+
expect(top(4)).toBe(200);
18+
expect(top(20)).toBe(1000);
19+
// Past the cap the base stays at 1000.
20+
expect(top(100)).toBe(1000);
21+
});
22+
23+
it("applies equal jitter: result is uniform in [base/2, base]", () => {
24+
// base for times=10 is 500, so the band is [250, 500].
25+
expect(mollifierReconnectDelayMs(10, () => 0)).toBe(250); // floor of band
26+
expect(mollifierReconnectDelayMs(10, () => 0.999999)).toBe(500); // top of band
27+
const mid = mollifierReconnectDelayMs(10, () => 0.5);
28+
expect(mid).toBeGreaterThanOrEqual(250);
29+
expect(mid).toBeLessThanOrEqual(500);
30+
});
31+
32+
it("never exceeds the original fixed-schedule envelope (strictly an improvement)", () => {
33+
for (const times of [1, 2, 5, 10, 20, 50]) {
34+
const cap = Math.min(times * 50, 1000);
35+
for (const r of [0, 0.25, 0.5, 0.75, 0.999999]) {
36+
const delay = mollifierReconnectDelayMs(times, () => r);
37+
expect(delay).toBeLessThanOrEqual(cap);
38+
expect(delay).toBeGreaterThanOrEqual(Math.floor(cap / 2));
39+
}
40+
}
41+
});
42+
43+
it("decorrelates concurrent reconnects (distinct values across random draws)", () => {
44+
const draws = [0.05, 0.3, 0.55, 0.8, 0.95].map((r) =>
45+
mollifierReconnectDelayMs(20, () => r),
46+
);
47+
// Lockstep would collapse to a single value; jitter spreads them.
48+
expect(new Set(draws).size).toBeGreaterThan(1);
49+
});
50+
});
651

752
describe("schemas", () => {
853
it("serialiseSnapshot then deserialiseSnapshot is identity for plain objects", () => {

packages/redis-worker/src/mollifier/buffer.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,24 @@ export type MollifierBufferOptions = {
1818
// have a safety net while PG replica lag settles. Q1 D2.
1919
const ACK_GRACE_TTL_SECONDS = 30;
2020

21+
// ioredis reconnect backoff for the mollifier buffer client. The base
22+
// grows linearly with the attempt count and is capped at 1s (the same
23+
// envelope as the previous fixed `Math.min(times * 50, 1000)` schedule).
24+
// We then apply equal jitter — a uniform pick in `[base/2, base]` — so a
25+
// fleet of webapp instances reconnecting after the same Redis blip don't
26+
// retry in lockstep and stampede Redis on recovery (thundering herd).
27+
// Because the jittered value never exceeds the original cap, this is never
28+
// slower than before — just decorrelated. Mirrors the jittered-backoff
29+
// approach the mutate-fallback wait loop adopted for the same reason.
30+
export function mollifierReconnectDelayMs(
31+
times: number,
32+
random: () => number = Math.random,
33+
): number {
34+
const base = Math.min(times * 50, 1000);
35+
const half = Math.floor(base / 2);
36+
return half + Math.round(random() * (base - half));
37+
}
38+
2139
export type SnapshotPatch =
2240
| { type: "append_tags"; tags: string[] }
2341
| { type: "set_metadata"; metadata: string; metadataType: string }
@@ -95,8 +113,7 @@ export class MollifierBuffer {
95113
{
96114
...options.redisOptions,
97115
retryStrategy(times) {
98-
const delay = Math.min(times * 50, 1000);
99-
return delay;
116+
return mollifierReconnectDelayMs(times);
100117
},
101118
maxRetriesPerRequest: 20,
102119
},

0 commit comments

Comments
 (0)