Skip to content

Commit b96bae2

Browse files
d-csclaude
andcommitted
fix(redis-worker): catch processEntry errors in mollifier drainer to keep batch alive
If buffer.requeue() or buffer.fail() throws during error recovery inside processEntry, the rejection used to escape processOneFromEnv and reject runOnce's Promise.all — discarding handler results from sibling envs in the same tick. Wrap processEntry in try/catch so the failed env is just counted as "failed" for the tick, matching the invariant stated in the processOneFromEnv comment. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 83c6933 commit b96bae2

2 files changed

Lines changed: 96 additions & 4 deletions

File tree

packages/redis-worker/src/mollifier/drainer.test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,88 @@ describe("MollifierDrainer resilience to transient buffer errors", () => {
363363
expect(result.failed).toBe(1);
364364
expect(handled).toEqual(["run_good"]);
365365
});
366+
367+
it("a requeue failure during retry recovery doesn't poison the rest of the batch", async () => {
368+
// Regression: handler throws a retryable error → processEntry calls
369+
// buffer.requeue() inside its catch block. If requeue() itself throws
370+
// (Redis blip during error recovery), the rejection used to escape
371+
// processOneFromEnv unwrapped and reject the runOnce Promise.all,
372+
// dropping handler results from sibling envs in the same tick.
373+
const handled: string[] = [];
374+
const buffer = makeStubBuffer({
375+
...eachEnvAsOwnOrg(["bad", "good"]),
376+
pop: async (envId: string) =>
377+
({
378+
runId: envId === "bad" ? "run_bad" : "run_good",
379+
envId,
380+
orgId: "org_1",
381+
payload: "{}",
382+
attempts: 0,
383+
createdAt: new Date(),
384+
}) as any,
385+
requeue: async () => {
386+
throw new Error("simulated requeue failure");
387+
},
388+
});
389+
390+
const drainer = new MollifierDrainer({
391+
buffer,
392+
handler: async (input) => {
393+
handled.push(input.runId);
394+
if (input.runId === "run_bad") throw new Error("transient");
395+
},
396+
concurrency: 5,
397+
maxAttempts: 3,
398+
isRetryable: () => true,
399+
logger: new Logger("test-drainer", "log"),
400+
});
401+
402+
const result = await drainer.runOnce();
403+
// Two envs scheduled, one handler succeeded (drained), one handler threw
404+
// and its recovery requeue threw too — counted as failed, batch not poisoned.
405+
expect(result.drained).toBe(1);
406+
expect(result.failed).toBe(1);
407+
expect(new Set(handled)).toEqual(new Set(["run_bad", "run_good"]));
408+
});
409+
410+
it("a fail() throw during terminal recovery doesn't poison the rest of the batch", async () => {
411+
// Regression: handler throws a non-retryable error → processEntry calls
412+
// buffer.fail() inside its catch block. If fail() itself throws, the
413+
// rejection used to escape unwrapped and reject runOnce's Promise.all.
414+
const handled: string[] = [];
415+
const buffer = makeStubBuffer({
416+
...eachEnvAsOwnOrg(["bad", "good"]),
417+
pop: async (envId: string) =>
418+
({
419+
runId: envId === "bad" ? "run_bad" : "run_good",
420+
envId,
421+
orgId: "org_1",
422+
payload: "{}",
423+
attempts: 0,
424+
createdAt: new Date(),
425+
}) as any,
426+
fail: async () => {
427+
throw new Error("simulated fail() failure");
428+
},
429+
});
430+
431+
const drainer = new MollifierDrainer({
432+
buffer,
433+
handler: async (input) => {
434+
handled.push(input.runId);
435+
if (input.runId === "run_bad") throw new Error("terminal");
436+
},
437+
concurrency: 5,
438+
maxAttempts: 3,
439+
isRetryable: () => false,
440+
logger: new Logger("test-drainer", "log"),
441+
});
442+
443+
const result = await drainer.runOnce();
444+
expect(result.drained).toBe(1);
445+
expect(result.failed).toBe(1);
446+
expect(new Set(handled)).toEqual(new Set(["run_bad", "run_good"]));
447+
});
366448
});
367449

368450
describe("MollifierDrainer per-tick org cap", () => {

packages/redis-worker/src/mollifier/drainer.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,11 @@ export class MollifierDrainer<TPayload = unknown> {
226226
return sorted[idx]!;
227227
}
228228

229-
// A `pop()` failure for one env (e.g. a Redis hiccup mid-batch) must not
229+
// A failure for one env (e.g. a Redis hiccup mid-batch in `pop`, or in
230+
// `requeue`/`fail` during error recovery inside `processEntry`) must not
230231
// poison the rest of the batch — `Promise.all` would otherwise reject and
231-
// bubble all the way to `loop()`. Catch here so the failed env is just
232-
// counted as "failed" for this tick and we move on.
232+
// bubble all the way to `loop()`. Catch both stages here so the failed env
233+
// is just counted as "failed" for this tick and we move on.
233234
private async processOneFromEnv(envId: string): Promise<"drained" | "failed" | "empty"> {
234235
let entry: BufferEntry | null;
235236
try {
@@ -239,7 +240,16 @@ export class MollifierDrainer<TPayload = unknown> {
239240
return "failed";
240241
}
241242
if (!entry) return "empty";
242-
return this.processEntry(entry);
243+
try {
244+
return await this.processEntry(entry);
245+
} catch (err) {
246+
this.logger.error("MollifierDrainer.processEntry failed", {
247+
envId,
248+
runId: entry.runId,
249+
err,
250+
});
251+
return "failed";
252+
}
243253
}
244254

245255
private async processEntry(entry: BufferEntry): Promise<"drained" | "failed"> {

0 commit comments

Comments
 (0)