Skip to content

Commit adbb9ea

Browse files
d-csclaude
andcommitted
feat(redis-worker,webapp): mollifier buffer extensions + snapshot type
Adds the buffer-side data layer used by phase-3 work: - buffer.ts gains entry inspection (getEntry), idempotency lookup (lookupIdempotency), in-place snapshot mutation (mutateSnapshot), and dwell tracking — all atomic via Lua. - snapshot.server.ts: shared MollifierSnapshot type + (de)serialise. - Drops the entry-TTL config — the drainer is the recovery mechanism. Adds methods to the buffer interface; nothing consumes them yet. Subsequent PRs in the stack wire trigger-time mollify, read-fallback, and mutation paths against this surface. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 37eeaa3 commit adbb9ea

8 files changed

Lines changed: 1786 additions & 138 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/redis-worker": minor
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Mollifier buffer feature set built on top of the initial primitives: idempotency-lookup with SETNX dedup, atomic snapshot-mutation API (`mutateSnapshot` with tag/metadata/delay/cancel patches), metadata CAS for lossless concurrent updates, watermark-paginated listing, claim primitives for pre-gate idempotency, ZSET-backed per-env queue, 30s post-ack grace TTL, and drop the accept-time entry TTL (drainer is now the only removal mechanism). `@trigger.dev/core` gains an optional `notice` field on the trigger response so the SDK can surface mollifier-queued guidance to customers.

apps/webapp/app/env.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,6 @@ const EnvironmentSchema = z
10951095
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
10961096
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
10971097
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
1098-
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
10991098
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
11001099
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
11011100
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),

apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ function initializeMollifierBuffer(): MollifierBuffer {
2222
enableAutoPipelining: true,
2323
...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
2424
},
25-
entryTtlSeconds: env.TRIGGER_MOLLIFIER_ENTRY_TTL_S,
2625
});
2726
}
2827

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { serialiseSnapshot, deserialiseSnapshot } from "@trigger.dev/redis-worker";
2+
3+
// MollifierSnapshot is the JSON-serialisable shape of the input that would be
4+
// passed to engine.trigger(). The drainer deserialises and replays it.
5+
// Kept as Record<string, unknown> at this layer — the engine.trigger call site
6+
// casts it to the engine's typed input. This keeps the mollifier subdirectory
7+
// from depending on @internal/run-engine internals.
8+
export type MollifierSnapshot = Record<string, unknown>;
9+
10+
export function serialiseMollifierSnapshot(input: MollifierSnapshot): string {
11+
return serialiseSnapshot(input);
12+
}
13+
14+
export function deserialiseMollifierSnapshot(serialised: string): MollifierSnapshot {
15+
return deserialiseSnapshot<MollifierSnapshot>(serialised);
16+
}

0 commit comments

Comments
 (0)