Skip to content

Commit 02c0b71

Browse files
committed
refactor(webapp): move mollifier drainer bootstrap out of legacy worker.server.ts
worker.server.ts is the original graphile-worker / ZodWorker file — every task in its catalog is annotated "@deprecated, moved to commonWorker.server.ts" (or similar). Adding new lifecycle wiring there during phase-2 was a mis-routing. Move the SIGTERM/SIGINT registration + drainer.start() call into a new mollifierDrainerWorker.server.ts alongside the redis-worker workers, and invoke its initMollifierDrainerWorker() from entry.server.tsx right after Worker.init(). The drainer's own factory still validates shutdown timeouts before constructing; the bootstrap registers signal handlers BEFORE calling start(), preserving the create+start contract. Also adds a header to worker.server.ts marking it legacy and pointing new lifecycle code at the redis-worker pattern, so the next person doesn't have to re-derive the routing rule.
1 parent 6487461 commit 02c0b71

3 files changed

Lines changed: 91 additions & 51 deletions

File tree

apps/webapp/app/entry.server.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import isbot from "isbot";
66
import { renderToPipeableStream } from "react-dom/server";
77
import { PassThrough } from "stream";
88
import * as Worker from "~/services/worker.server";
9+
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
910
import { bootstrap } from "./bootstrap";
1011
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1112
import {
@@ -247,6 +248,8 @@ Worker.init().catch((error) => {
247248
logError(error);
248249
});
249250

251+
initMollifierDrainerWorker();
252+
250253
bootstrap().catch((error) => {
251254
logError(error);
252255
});

apps/webapp/app/services/worker.server.ts

Lines changed: 21 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,24 @@
1+
/**
2+
* ⚠️ LEGACY — Graphile-worker / ZodWorker setup. Do not touch.
3+
*
4+
* This file wires the original background-job system the webapp was
5+
* built on (`@internal/zod-worker` → graphile-worker → Postgres). It is
6+
* now in deprecation mode: every task in `workerCatalog` below is
7+
* annotated with `@deprecated, moved to <new home>` and the live jobs
8+
* for new features all run on `@trigger.dev/redis-worker` instead.
9+
*
10+
* Where to put new things:
11+
* - Background jobs / queues → use redis-worker, alongside
12+
* `~/v3/commonWorker.server.ts`, `~/v3/alertsWorker.server.ts`, or
13+
* `~/v3/batchTriggerWorker.server.ts`.
14+
* - Run lifecycle → `@internal/run-engine` via `~/v3/runEngine.server`.
15+
* - Custom polling loops with their own Redis connection → keep them
16+
* in their own lifecycle module (e.g. `~/v3/mollifierDrainerWorker.server.ts`)
17+
* and wire the bootstrap from `entry.server.tsx`. Don't reach into
18+
* `init()` below.
19+
*
20+
* Edit only when removing legacy paths.
21+
*/
122
import { ZodWorker } from "@internal/zod-worker";
223
import { DeliverEmailSchema } from "emails";
324
import { z } from "zod";
@@ -26,7 +47,6 @@ import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
2647
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
2748
import { RetryAttemptService } from "~/v3/services/retryAttempt.server";
2849
import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server";
29-
import { getMollifierDrainer } from "~/v3/mollifier/mollifierDrainer.server";
3050
import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server";
3151
import { sendEmail } from "./email.server";
3252
import { logger } from "./logger.server";
@@ -107,7 +127,6 @@ let workerQueue: ZodWorker<typeof workerCatalog>;
107127

108128
declare global {
109129
var __worker__: ZodWorker<typeof workerCatalog>;
110-
var __mollifierShutdownRegistered__: boolean | undefined;
111130
}
112131

113132
// this is needed because in development we don't want to restart
@@ -130,55 +149,6 @@ export async function init() {
130149
if (env.WORKER_ENABLED === "true") {
131150
await workerQueue.initialize();
132151
}
133-
134-
// Only the worker role drains the mollifier buffer. API-only replicas
135-
// still produce into the buffer via the trigger hot path, but the
136-
// polling loop + Redis consumer connection only belongs on workers —
137-
// otherwise every webapp replica races for the same entries.
138-
if (env.WORKER_ENABLED !== "true") {
139-
return;
140-
}
141-
142-
try {
143-
// getMollifierDrainer() runs the singleton factory, which validates the
144-
// shutdown-timeout reconciliation against GRACEFUL_SHUTDOWN_TIMEOUT and
145-
// throws BEFORE constructing the drainer if it's misconfigured. The
146-
// outer catch below logs and aborts drainer registration on either that
147-
// validation error or a Redis init failure — no half-started state. The
148-
// returned drainer is configured-but-stopped; start() runs below, AFTER
149-
// the SIGTERM/SIGINT handlers are registered, so a signal landing during
150-
// boot can never find the polling loop running without a graceful-stop
151-
// path. Same `__mollifierShutdownRegistered__` guard owns both the
152-
// handler registration and the start() call so dev hot-reloads don't
153-
// double-register or double-start.
154-
const drainer = getMollifierDrainer();
155-
if (drainer && !global.__mollifierShutdownRegistered__) {
156-
// The drainer owns a polling loop and a Redis client; let it drain
157-
// in-flight pops on shutdown rather than tearing the process down
158-
// mid-handler. `init()` is called per request from entry.server.tsx,
159-
// and `process.once()` only removes its listener after it fires — so
160-
// without a process-global guard, dev hot-reloads would stack a fresh
161-
// listener pair every request. Mirrors the `__worker__` singleton
162-
// pattern above.
163-
// Bound shutdown so a hung handler can't block process exit past the
164-
// pod's termination grace period. `drainer.stop({ timeoutMs })` logs a
165-
// warning and returns if the deadline is hit while a handler is still
166-
// in flight.
167-
const stopDrainer = () => {
168-
drainer
169-
.stop({ timeoutMs: env.MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS })
170-
.catch((error) => {
171-
logger.error("Failed to stop mollifier drainer", { error });
172-
});
173-
};
174-
process.once("SIGTERM", stopDrainer);
175-
process.once("SIGINT", stopDrainer);
176-
global.__mollifierShutdownRegistered__ = true;
177-
drainer.start();
178-
}
179-
} catch (error) {
180-
logger.error("Failed to initialise mollifier drainer", { error });
181-
}
182152
}
183153

184154
function getWorkerQueue() {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { env } from "~/env.server";
2+
import { logger } from "~/services/logger.server";
3+
import { getMollifierDrainer } from "./mollifier/mollifierDrainer.server";
4+
5+
declare global {
6+
// eslint-disable-next-line no-var
7+
var __mollifierShutdownRegistered__: boolean | undefined;
8+
}
9+
10+
/**
11+
* Bootstraps the mollifier drainer.
12+
*
13+
* Two-step lifecycle:
14+
* 1. Construct the drainer via the gated singleton in
15+
* `mollifierDrainer.server.ts`. That factory validates the
16+
* shutdown-timeout reconciliation against `GRACEFUL_SHUTDOWN_TIMEOUT`
17+
* and throws BEFORE returning if it's misconfigured; the returned
18+
* drainer is configured-but-stopped.
19+
* 2. Register SIGTERM/SIGINT shutdown handlers, then call
20+
* `drainer.start()`. Doing this in the bootstrap (and not in the
21+
* factory) guarantees a signal landing during boot can never find
22+
* the polling loop running without a graceful-stop path.
23+
*
24+
* The drainer is intentionally NOT wired through `~/services/worker.server`
25+
* — that file is the legacy ZodWorker / graphile-worker setup. The
26+
* mollifier drainer is a custom polling loop over `MollifierBuffer`, not
27+
* a graphile-worker job, so it gets its own lifecycle file alongside the
28+
* redis-worker workers (`commonWorker`, `alertsWorker`,
29+
* `batchTriggerWorker`).
30+
*
31+
* Gating order:
32+
* - `WORKER_ENABLED !== "true"` → early return (API-only replicas
33+
* still produce into the buffer via the trigger hot path; only worker
34+
* replicas drain it, otherwise every replica races for the same
35+
* entries).
36+
* - `MOLLIFIER_ENABLED !== "1"` → `getMollifierDrainer()` returns null
37+
* and the bootstrap is a no-op.
38+
*/
39+
export function initMollifierDrainerWorker(): void {
40+
if (env.WORKER_ENABLED !== "true") {
41+
return;
42+
}
43+
44+
try {
45+
const drainer = getMollifierDrainer();
46+
if (drainer && !global.__mollifierShutdownRegistered__) {
47+
// `__mollifierShutdownRegistered__` guards against double-register
48+
// on dev hot-reloads (this bootstrap is called from
49+
// entry.server.tsx, which Remix dev re-evaluates on every change).
50+
// Same guard owns both the handler registration and the start()
51+
// call so the two never get out of sync.
52+
const stopDrainer = () => {
53+
drainer
54+
.stop({ timeoutMs: env.MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS })
55+
.catch((error) => {
56+
logger.error("Failed to stop mollifier drainer", { error });
57+
});
58+
};
59+
process.once("SIGTERM", stopDrainer);
60+
process.once("SIGINT", stopDrainer);
61+
global.__mollifierShutdownRegistered__ = true;
62+
drainer.start();
63+
}
64+
} catch (error) {
65+
logger.error("Failed to initialise mollifier drainer", { error });
66+
}
67+
}

0 commit comments

Comments
 (0)