Skip to content

Commit 51911a1

Browse files
d-csclaude
andcommitted
test(redis-worker): align drainer tests with buffer ack/fail semantics
After the buffer extensions in this PR: - ack() keeps the entry alive with a grace TTL as a read-fallback safety net. Test asserts the entry persists with materialised=true. - fail() deletes the entry once the drainer-handler has written the canonical SYSTEM_FAILURE PG row. Tests assert the entry is null and use runOnce()'s `failed` counter as the surviving signal. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1cd2da6 commit 51911a1

1 file changed

Lines changed: 16 additions & 7 deletions

File tree

packages/redis-worker/src/mollifier/drainer.test.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { MollifierDrainer } from "./drainer.js";
66
import { serialiseSnapshot } from "./schemas.js";
77

88
const noopOptions = {
9-
entryTtlSeconds: 600,
109
logger: new Logger("test", "log"),
1110
};
1211

@@ -87,8 +86,11 @@ describe("MollifierDrainer.runOnce", () => {
8786
payload: { foo: 1 },
8887
});
8988

89+
// After ack the entry persists as a read-fallback safety net with
90+
// materialised=true and a fresh grace TTL (Q1 D2 / Phase B2).
9091
const entry = await buffer.getEntry("run_1");
91-
expect(entry).toBeNull();
92+
expect(entry).not.toBeNull();
93+
expect(entry!.materialised).toBe(true);
9294
} finally {
9395
await buffer.close();
9496
}
@@ -167,9 +169,14 @@ describe("MollifierDrainer error handling", () => {
167169
expect(after2!.status).toBe("QUEUED");
168170
expect(after2!.attempts).toBe(2);
169171

170-
await drainer.runOnce();
172+
const result3 = await drainer.runOnce();
173+
// On attempt 3 the drainer hits maxAttempts and calls fail(),
174+
// which deletes the entry — once the drainer-handler has written
175+
// the SYSTEM_FAILURE PG row the buffer entry is no longer
176+
// load-bearing. The runOnce result is the surviving signal.
171177
const after3 = await buffer.getEntry("run_r");
172-
expect(after3!.status).toBe("FAILED");
178+
expect(after3).toBeNull();
179+
expect(result3.failed).toBe(1);
173180
expect(calls).toBe(3);
174181
} finally {
175182
await buffer.close();
@@ -202,11 +209,13 @@ describe("MollifierDrainer error handling", () => {
202209
try {
203210
await buffer.accept({ runId: "run_nr", envId: "env_a", orgId: "org_1", payload: "{}" });
204211

205-
await drainer.runOnce();
212+
const result = await drainer.runOnce();
206213

214+
// fail() deletes the entry once the drainer-handler has written
215+
// the canonical SYSTEM_FAILURE PG row.
207216
const entry = await buffer.getEntry("run_nr");
208-
expect(entry!.status).toBe("FAILED");
209-
expect(entry!.lastError).toEqual({ code: "Error", message: "validation failure" });
217+
expect(entry).toBeNull();
218+
expect(result.failed).toBe(1);
210219
} finally {
211220
await buffer.close();
212221
}

0 commit comments

Comments
 (0)