@@ -238,8 +238,8 @@ describe("MollifierBuffer.pop orphan handling", () => {
238238 } ) ;
239239
240240 try {
241- // Simulate a TTL-expired orphan: queue ref exists, entry hash does not.
242- await buffer [ "redis" ] . zadd ( "mollifier:queue:env_a" , 1 , "run_orphan" ) ;
241+ // Simulate an evicted orphan: queue ref exists, entry hash does not.
242+ await buffer [ "redis" ] . rpush ( "mollifier:queue:env_a" , "run_orphan" ) ;
243243
244244 const popped = await buffer . pop ( "env_a" ) ;
245245 expect ( popped ) . toBeNull ( ) ;
@@ -249,7 +249,7 @@ describe("MollifierBuffer.pop orphan handling", () => {
249249 expect ( Object . keys ( raw ) ) . toHaveLength ( 0 ) ;
250250
251251 // Queue is drained — the loop pops orphans until empty.
252- const qLen = await buffer [ "redis" ] . zcard ( "mollifier:queue:env_a" ) ;
252+ const qLen = await buffer [ "redis" ] . llen ( "mollifier:queue:env_a" ) ;
253253 expect ( qLen ) . toBe ( 0 ) ;
254254 } finally {
255255 await buffer . close ( ) ;
@@ -271,20 +271,21 @@ describe("MollifierBuffer.pop orphan handling", () => {
271271 } ) ;
272272
273273 try {
274- // Layout by score (lowest -first, since ZPOPMIN takes the min):
275- // orphan_a (score 1) → valid (score = its createdAtMicros, large) → orphan_b (score 1e18).
276- // First pop skips orphan_a, returns valid; orphan_b remains.
277- await buffer [ "redis" ] . zadd ( "mollifier:queue:env_a" , 1 , " orphan_a" ) ;
274+ // Build the queue so RPOP (tail -first) yields: orphan_a, valid,
275+ // orphan_b. accept LPUSHes " valid"; RPUSH puts orphan_a at the
276+ // tail (popped first), LPUSH puts orphan_b at the head (popped
277+ // last). First pop skips orphan_a, returns valid; orphan_b remains.
278278 await buffer . accept ( { runId : "valid" , envId : "env_a" , orgId : "org_1" , payload : "{}" } ) ;
279- await buffer [ "redis" ] . zadd ( "mollifier:queue:env_a" , 1e18 , "orphan_b" ) ;
279+ await buffer [ "redis" ] . rpush ( "mollifier:queue:env_a" , "orphan_a" ) ;
280+ await buffer [ "redis" ] . lpush ( "mollifier:queue:env_a" , "orphan_b" ) ;
280281
281282 const popped = await buffer . pop ( "env_a" ) ;
282283 expect ( popped ) . not . toBeNull ( ) ;
283284 expect ( popped ! . runId ) . toBe ( "valid" ) ;
284285 expect ( popped ! . status ) . toBe ( "DRAINING" ) ;
285286
286287 // The trailing orphan_b is still in the queue (single pop call).
287- const remaining = await buffer [ "redis" ] . zcard ( "mollifier:queue:env_a" ) ;
288+ const remaining = await buffer [ "redis" ] . llen ( "mollifier:queue:env_a" ) ;
288289 expect ( remaining ) . toBe ( 1 ) ;
289290
290291 // A second pop drains the trailing orphan_b. The queue is now
@@ -559,13 +560,14 @@ describe("MollifierBuffer.requeue on missing entry", () => {
559560
560561describe ( "MollifierBuffer.requeue ordering" , ( ) => {
561562 redisTest (
562- "requeued entry retains its original createdAt and pops next (oldest-first by createdAt) " ,
563+ "requeued entry pops next (RPUSH to the RPOP/tail end), preserving FIFO " ,
563564 { timeout : 20_000 } ,
564565 async ( { redisContainer } ) => {
565- // Score == createdAtMicros; requeue does not bump the score. The
566- // oldest entry continues to pop first across retries. `maxAttempts`
567- // in the drainer bounds the retry loop for a persistently failing
568- // entry (after which it goes to the `fail` path, not requeue).
566+ // LIST FIFO: accept LPUSHes at the head, pop RPOPs from the tail, so
567+ // the first-accepted entry pops first. requeue RPUSHes back to the
568+ // tail, so a transiently failed entry pops next rather than going to
569+ // the back. `maxAttempts` in the drainer bounds the retry loop for a
570+ // persistently failing entry (after which it goes to `fail`, not requeue).
569571 const buffer = new MollifierBuffer ( {
570572 redisOptions : {
571573 host : redisContainer . getHost ( ) ,
@@ -577,17 +579,15 @@ describe("MollifierBuffer.requeue ordering", () => {
577579
578580 try {
579581 await buffer . accept ( { runId : "a" , envId : "env_a" , orgId : "org_1" , payload : "{}" } ) ;
580- await new Promise ( ( r ) => setTimeout ( r , 2 ) ) ;
581582 await buffer . accept ( { runId : "b" , envId : "env_a" , orgId : "org_1" , payload : "{}" } ) ;
582- await new Promise ( ( r ) => setTimeout ( r , 2 ) ) ;
583583 await buffer . accept ( { runId : "c" , envId : "env_a" , orgId : "org_1" , payload : "{}" } ) ;
584584
585585 const first = await buffer . pop ( "env_a" ) ;
586586 expect ( first ! . runId ) . toBe ( "a" ) ;
587587
588588 await buffer . requeue ( "a" ) ;
589589
590- // a still has the smallest createdAtMicros → pops next.
590+ // a was RPUSHed back to the tail → pops next, ahead of b and c .
591591 const next = await buffer . pop ( "env_a" ) ;
592592 expect ( next ! . runId ) . toBe ( "a" ) ;
593593 const after = await buffer . pop ( "env_a" ) ;
@@ -2045,9 +2045,9 @@ describe("MollifierBuffer.mutateSnapshot", () => {
20452045 ) ;
20462046} ) ;
20472047
2048- describe ( "MollifierBuffer ZSET storage" , ( ) => {
2048+ describe ( "MollifierBuffer LIST storage" , ( ) => {
20492049 redisTest (
2050- "queue key is a ZSET scored by entry's createdAtMicros " ,
2050+ "queue key is a LIST; createdAtMicros is a hash field, not a sort key " ,
20512051 { timeout : 20_000 } ,
20522052 async ( { redisContainer } ) => {
20532053 const buffer = new MollifierBuffer ( {
@@ -2062,32 +2062,29 @@ describe("MollifierBuffer ZSET storage", () => {
20622062 try {
20632063 await buffer . accept ( { runId : "z1" , envId : "env_z" , orgId : "org_1" , payload : "{}" } ) ;
20642064
2065- // ZSET-only commands must succeed against the queue key.
2066- const card = await buffer [ "redis" ] . zcard ( "mollifier:queue:env_z" ) ;
2067- expect ( card ) . toBe ( 1 ) ;
2065+ // LIST-only commands must succeed against the queue key.
2066+ const len = await buffer [ "redis" ] . llen ( "mollifier:queue:env_z" ) ;
2067+ expect ( len ) . toBe ( 1 ) ;
2068+ const members = await buffer [ "redis" ] . lrange ( "mollifier:queue:env_z" , 0 , - 1 ) ;
2069+ expect ( members ) . toEqual ( [ "z1" ] ) ;
20682070
2069- const score = await buffer [ "redis" ] . zscore ( "mollifier:queue:env_z" , "z1" ) ;
2070- expect ( score ) . not . toBeNull ( ) ;
2071- const scoreNum = Number ( score ) ;
2072- expect ( Number . isFinite ( scoreNum ) ) . toBe ( true ) ;
2071+ // The queue holds no score — it's not a ZSET.
2072+ await expect ( buffer [ "redis" ] . zscore ( "mollifier:queue:env_z" , "z1" ) ) . rejects . toThrow ( ) ;
20732073
2074- // Score matches the entry hash's createdAtMicros field.
2075- const micros = await buffer [ "redis" ] . hget ( "mollifier:entries:z1" , "createdAtMicros" ) ;
2076- expect ( micros ) . not . toBeNull ( ) ;
2077- expect ( Number ( micros ) ) . toBe ( scoreNum ) ;
2078-
2079- // Score is plausibly recent (within last minute as microseconds).
2074+ // createdAtMicros lives on the entry hash (for dwell metrics) and
2075+ // is plausibly recent (within the last minute, as microseconds).
2076+ const micros = Number ( await buffer [ "redis" ] . hget ( "mollifier:entries:z1" , "createdAtMicros" ) ) ;
20802077 const nowMicros = Date . now ( ) * 1000 ;
2081- expect ( scoreNum ) . toBeGreaterThan ( nowMicros - 60_000_000 ) ;
2082- expect ( scoreNum ) . toBeLessThanOrEqual ( nowMicros + 1_000_000 ) ;
2078+ expect ( micros ) . toBeGreaterThan ( nowMicros - 60_000_000 ) ;
2079+ expect ( micros ) . toBeLessThanOrEqual ( nowMicros + 1_000_000 ) ;
20832080 } finally {
20842081 await buffer . close ( ) ;
20852082 }
20862083 } ,
20872084 ) ;
20882085
20892086 redisTest (
2090- "pop returns entries in ascending createdAtMicros order (FIFO by time, not by member )" ,
2087+ "pop returns entries in FIFO insertion order (independent of member lex order )" ,
20912088 { timeout : 20_000 } ,
20922089 async ( { redisContainer } ) => {
20932090 const buffer = new MollifierBuffer ( {
@@ -2100,11 +2097,10 @@ describe("MollifierBuffer ZSET storage", () => {
21002097 } ) ;
21012098
21022099 try {
2103- // Insert runIds in reverse-lex order to prove ordering is by score, not member.
2100+ // Accept in reverse-lex order to prove ordering is by insertion
2101+ // (LPUSH head / RPOP tail), not by member value.
21042102 await buffer . accept ( { runId : "zzz" , envId : "env_o" , orgId : "org_1" , payload : "{}" } ) ;
2105- await new Promise ( ( r ) => setTimeout ( r , 5 ) ) ;
21062103 await buffer . accept ( { runId : "mmm" , envId : "env_o" , orgId : "org_1" , payload : "{}" } ) ;
2107- await new Promise ( ( r ) => setTimeout ( r , 5 ) ) ;
21082104 await buffer . accept ( { runId : "aaa" , envId : "env_o" , orgId : "org_1" , payload : "{}" } ) ;
21092105
21102106 const first = await buffer . pop ( "env_o" ) ;
@@ -2120,7 +2116,7 @@ describe("MollifierBuffer ZSET storage", () => {
21202116 ) ;
21212117
21222118 redisTest (
2123- "requeue keeps original score ; createdAt is immutable across retries" ,
2119+ "requeue re-enqueues to the LIST ; createdAt is immutable across retries" ,
21242120 { timeout : 20_000 } ,
21252121 async ( { redisContainer } ) => {
21262122 const buffer = new MollifierBuffer ( {
@@ -2134,24 +2130,17 @@ describe("MollifierBuffer ZSET storage", () => {
21342130
21352131 try {
21362132 await buffer . accept ( { runId : "rq" , envId : "env_rq" , orgId : "org_1" , payload : "{}" } ) ;
2137- const originalScore = Number (
2138- await buffer [ "redis" ] . zscore ( "mollifier:queue:env_rq" , "rq" ) ,
2139- ) ;
2140- const originalMicros = Number (
2141- await buffer [ "redis" ] . hget ( "mollifier:entries:rq" , "createdAtMicros" ) ,
2142- ) ;
2133+ const originalMicros = await buffer [ "redis" ] . hget ( "mollifier:entries:rq" , "createdAtMicros" ) ;
21432134
21442135 await buffer . pop ( "env_rq" ) ;
2145- await new Promise ( ( r ) => setTimeout ( r , 5 ) ) ;
2136+ // Queue is empty after the pop.
2137+ expect ( await buffer [ "redis" ] . llen ( "mollifier:queue:env_rq" ) ) . toBe ( 0 ) ;
2138+
21462139 await buffer . requeue ( "rq" ) ;
21472140
2148- const newScore = Number (
2149- await buffer [ "redis" ] . zscore ( "mollifier:queue:env_rq" , "rq" ) ,
2150- ) ;
2151- const newMicros = Number (
2152- await buffer [ "redis" ] . hget ( "mollifier:entries:rq" , "createdAtMicros" ) ,
2153- ) ;
2154- expect ( newScore ) . toBe ( originalScore ) ;
2141+ // Back on the LIST, and createdAtMicros is unchanged.
2142+ expect ( await buffer [ "redis" ] . lrange ( "mollifier:queue:env_rq" , 0 , - 1 ) ) . toEqual ( [ "rq" ] ) ;
2143+ const newMicros = await buffer [ "redis" ] . hget ( "mollifier:entries:rq" , "createdAtMicros" ) ;
21552144 expect ( newMicros ) . toBe ( originalMicros ) ;
21562145 } finally {
21572146 await buffer . close ( ) ;
@@ -2234,181 +2223,6 @@ describe("MollifierBuffer.listEntriesForEnv", () => {
22342223 } ) ;
22352224} ) ;
22362225
2237- describe ( "MollifierBuffer.listForEnvWithWatermark" , ( ) => {
2238- // Seed a QUEUED entry, then pin its ZSET score and hash `createdAtMicros`
2239- // to a deterministic value so ordering and the watermark cursor don't
2240- // depend on wall-clock timing (Date.now() ties within a millisecond).
2241- async function seed ( buffer : MollifierBuffer , envId : string , runId : string , micros : number ) {
2242- await buffer . accept ( { runId, envId, orgId : "org_1" , payload : "{}" } ) ;
2243- await buffer [ "redis" ] . zadd ( `mollifier:queue:${ envId } ` , String ( micros ) , runId ) ;
2244- await buffer [ "redis" ] . hset ( `mollifier:entries:${ runId } ` , "createdAtMicros" , String ( micros ) ) ;
2245- }
2246-
2247- redisTest ( "pageSize <= 0 returns empty without hitting redis" , { timeout : 20_000 } , async ( { redisContainer } ) => {
2248- const buffer = new MollifierBuffer ( {
2249- redisOptions : {
2250- host : redisContainer . getHost ( ) ,
2251- port : redisContainer . getPort ( ) ,
2252- password : redisContainer . getPassword ( ) ,
2253- } ,
2254- logger : new Logger ( "test" , "log" ) ,
2255- } ) ;
2256- try {
2257- expect ( await buffer . listForEnvWithWatermark ( { envId : "env_w" , pageSize : 0 } ) ) . toEqual ( [ ] ) ;
2258- expect ( await buffer . listForEnvWithWatermark ( { envId : "env_w" , pageSize : - 3 } ) ) . toEqual ( [ ] ) ;
2259- } finally {
2260- await buffer . close ( ) ;
2261- }
2262- } ) ;
2263-
2264- redisTest (
2265- "page 1 returns newest-first up to pageSize without consuming entries" ,
2266- { timeout : 20_000 } ,
2267- async ( { redisContainer } ) => {
2268- const buffer = new MollifierBuffer ( {
2269- redisOptions : {
2270- host : redisContainer . getHost ( ) ,
2271- port : redisContainer . getPort ( ) ,
2272- password : redisContainer . getPassword ( ) ,
2273- } ,
2274- logger : new Logger ( "test" , "log" ) ,
2275- } ) ;
2276- try {
2277- await seed ( buffer , "env_w" , "wa" , 1000 ) ;
2278- await seed ( buffer , "env_w" , "wb" , 2000 ) ;
2279- await seed ( buffer , "env_w" , "wc" , 3000 ) ;
2280-
2281- const page = await buffer . listForEnvWithWatermark ( { envId : "env_w" , pageSize : 2 } ) ;
2282- // Newest-first by createdAtMicros.
2283- expect ( page . map ( ( e ) => e . runId ) ) . toEqual ( [ "wc" , "wb" ] ) ;
2284-
2285- // Non-destructive: drainer still pops oldest-first.
2286- const popped : string [ ] = [ ] ;
2287- for ( let i = 0 ; i < 3 ; i ++ ) {
2288- const e = await buffer . pop ( "env_w" ) ;
2289- if ( e ) popped . push ( e . runId ) ;
2290- }
2291- expect ( popped ) . toEqual ( [ "wa" , "wb" , "wc" ] ) ;
2292- } finally {
2293- await buffer . close ( ) ;
2294- }
2295- } ,
2296- ) ;
2297-
2298- redisTest (
2299- "page N continues strictly below the watermark score" ,
2300- { timeout : 20_000 } ,
2301- async ( { redisContainer } ) => {
2302- const buffer = new MollifierBuffer ( {
2303- redisOptions : {
2304- host : redisContainer . getHost ( ) ,
2305- port : redisContainer . getPort ( ) ,
2306- password : redisContainer . getPassword ( ) ,
2307- } ,
2308- logger : new Logger ( "test" , "log" ) ,
2309- } ) ;
2310- try {
2311- await seed ( buffer , "env_w" , "wa" , 1000 ) ;
2312- await seed ( buffer , "env_w" , "wb" , 2000 ) ;
2313- await seed ( buffer , "env_w" , "wc" , 3000 ) ;
2314-
2315- const page1 = await buffer . listForEnvWithWatermark ( { envId : "env_w" , pageSize : 2 } ) ;
2316- expect ( page1 . map ( ( e ) => e . runId ) ) . toEqual ( [ "wc" , "wb" ] ) ;
2317-
2318- const last = page1 [ page1 . length - 1 ] ! ;
2319- const page2 = await buffer . listForEnvWithWatermark ( {
2320- envId : "env_w" ,
2321- pageSize : 2 ,
2322- watermark : { createdAtMicros : last . createdAtMicros , runId : last . runId } ,
2323- } ) ;
2324- // Only the entry strictly below score 2000 remains; no overlap.
2325- expect ( page2 . map ( ( e ) => e . runId ) ) . toEqual ( [ "wa" ] ) ;
2326- } finally {
2327- await buffer . close ( ) ;
2328- }
2329- } ,
2330- ) ;
2331-
2332- redisTest (
2333- "tied-score watermark surfaces lex-smaller members on the next page without dupes" ,
2334- { timeout : 20_000 } ,
2335- async ( { redisContainer } ) => {
2336- const buffer = new MollifierBuffer ( {
2337- redisOptions : {
2338- host : redisContainer . getHost ( ) ,
2339- port : redisContainer . getPort ( ) ,
2340- password : redisContainer . getPassword ( ) ,
2341- } ,
2342- logger : new Logger ( "test" , "log" ) ,
2343- } ) ;
2344- try {
2345- // Three entries share one score; ZSET breaks the tie by member,
2346- // and zrevrangebyscore returns them member-DESC: tc, tb, ta.
2347- await seed ( buffer , "env_w" , "ta" , 2000 ) ;
2348- await seed ( buffer , "env_w" , "tb" , 2000 ) ;
2349- await seed ( buffer , "env_w" , "tc" , 2000 ) ;
2350-
2351- const page1 = await buffer . listForEnvWithWatermark ( { envId : "env_w" , pageSize : 2 } ) ;
2352- expect ( page1 . map ( ( e ) => e . runId ) ) . toEqual ( [ "tc" , "tb" ] ) ;
2353-
2354- const last = page1 [ page1 . length - 1 ] ! ;
2355- const page2 = await buffer . listForEnvWithWatermark ( {
2356- envId : "env_w" ,
2357- pageSize : 2 ,
2358- watermark : { createdAtMicros : last . createdAtMicros , runId : last . runId } ,
2359- } ) ;
2360- // Same score, lex-smaller than the "tb" anchor — and not "tb"
2361- // itself (no duplicate across the page boundary).
2362- expect ( page2 . map ( ( e ) => e . runId ) ) . toEqual ( [ "ta" ] ) ;
2363- } finally {
2364- await buffer . close ( ) ;
2365- }
2366- } ,
2367- ) ;
2368-
2369- redisTest (
2370- "skips orphan queue references (entry hash gone) during listing" ,
2371- { timeout : 20_000 } ,
2372- async ( { redisContainer } ) => {
2373- const buffer = new MollifierBuffer ( {
2374- redisOptions : {
2375- host : redisContainer . getHost ( ) ,
2376- port : redisContainer . getPort ( ) ,
2377- password : redisContainer . getPassword ( ) ,
2378- } ,
2379- logger : new Logger ( "test" , "log" ) ,
2380- } ) ;
2381- try {
2382- await seed ( buffer , "env_w" , "live" , 1000 ) ;
2383- await seed ( buffer , "env_w" , "orphan" , 2000 ) ;
2384- // Drop the orphan's hash but leave its queue ref behind.
2385- await buffer [ "redis" ] . del ( "mollifier:entries:orphan" ) ;
2386-
2387- const page = await buffer . listForEnvWithWatermark ( { envId : "env_w" , pageSize : 10 } ) ;
2388- expect ( page . map ( ( e ) => e . runId ) ) . toEqual ( [ "live" ] ) ;
2389- } finally {
2390- await buffer . close ( ) ;
2391- }
2392- } ,
2393- ) ;
2394-
2395- redisTest ( "returns empty for an env with no queued entries" , { timeout : 20_000 } , async ( { redisContainer } ) => {
2396- const buffer = new MollifierBuffer ( {
2397- redisOptions : {
2398- host : redisContainer . getHost ( ) ,
2399- port : redisContainer . getPort ( ) ,
2400- password : redisContainer . getPassword ( ) ,
2401- } ,
2402- logger : new Logger ( "test" , "log" ) ,
2403- } ) ;
2404- try {
2405- expect ( await buffer . listForEnvWithWatermark ( { envId : "env_empty_w" , pageSize : 10 } ) ) . toEqual ( [ ] ) ;
2406- } finally {
2407- await buffer . close ( ) ;
2408- }
2409- } ) ;
2410- } ) ;
2411-
24122226// Composite-key safety. The Redis-key builders concatenate
24132227// `(envId, taskIdentifier, idempotencyKey)` with `:` separators; without
24142228// per-segment encoding, `taskIdentifier="a:b"` and `idempotencyKey="x"`
0 commit comments