Skip to content

Commit a0fc0f3

Browse files
committed
feat(webapp): gate worker dequeues by worker queue via env var
Add RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES: a comma-separated list of worker queues (or base regions, which also cover the :scheduled split) for which the engine API refuses worker dequeue requests and returns no work, so those runs stay queued instead of being handed to workers that cannot run them. Unset means no gating. Blocked dequeues increment the run_engine.dequeue.blocked otel counter, tagged by worker_queue and region.
1 parent 5667461 commit a0fc0f3

6 files changed

Lines changed: 114 additions & 3 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add a `RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES` setting that refuses worker dequeue requests for the listed worker queues (or base regions), so their runs stay queued instead of being handed to workers that can't run them.

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,7 @@ const EnvironmentSchema = z
809809
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
810810
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
811811
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
812+
RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES: z.string().optional(),
812813
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000),
813814
RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000),
814815
RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { getMeter } from "@internal/tracing";
2+
import { env } from "~/env.server";
3+
import {
4+
baseWorkerQueue,
5+
matchesDisabledWorkerQueue,
6+
parseDisabledWorkerQueues,
7+
} from "./workerQueueSplit.server";
8+
9+
const meter = getMeter("run-engine-dequeue-gate");
10+
11+
const blockedDequeueCounter = meter.createCounter("run_engine.dequeue.blocked", {
12+
description:
13+
"Count of worker dequeue requests refused because the worker queue is gated off via RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES",
14+
});
15+
16+
const disabledWorkerQueues = parseDisabledWorkerQueues(
17+
env.RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES
18+
);
19+
20+
export function isWorkerQueueDequeueDisabled(workerQueue: string): boolean {
21+
return matchesDisabledWorkerQueue(workerQueue, disabledWorkerQueues);
22+
}
23+
24+
export function recordBlockedDequeue(workerQueue: string): void {
25+
blockedDequeueCounter.add(1, {
26+
worker_queue: workerQueue,
27+
region: baseWorkerQueue(workerQueue),
28+
});
29+
}

apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,25 @@ export function workerQueueForClass(
122122

123123
return masterQueue;
124124
}
125+
126+
export function parseDisabledWorkerQueues(raw: string | undefined): Set<string> {
127+
return new Set(
128+
(raw ?? "")
129+
.split(",")
130+
.map((entry) => entry.trim())
131+
.filter(Boolean)
132+
);
133+
}
134+
135+
export function matchesDisabledWorkerQueue(
136+
workerQueue: string,
137+
disabledWorkerQueues: ReadonlySet<string>
138+
): boolean {
139+
if (disabledWorkerQueues.size === 0) {
140+
return false;
141+
}
142+
143+
return (
144+
disabledWorkerQueues.has(workerQueue) || disabledWorkerQueues.has(baseWorkerQueue(workerQueue))
145+
);
146+
}

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import { singleton } from "~/utils/singleton";
2828
import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server";
2929
import { machinePresetFromName } from "~/v3/machinePresets.server";
3030
import { workerQueueForClass } from "~/runEngine/concerns/workerQueueSplit.server";
31+
import {
32+
isWorkerQueueDequeueDisabled,
33+
recordBlockedDequeue,
34+
} from "~/runEngine/concerns/dequeueGate.server";
3135
import { WithRunEngine, WithRunEngineOptions } from "../baseService.server";
3236

3337
const authenticatedWorkerInstanceCache = singleton(
@@ -377,11 +381,16 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
377381
runnerId?: string;
378382
queueClass?: WorkerQueueClass;
379383
}): Promise<DequeuedMessage[]> {
380-
// Derive the actual queue from this worker's own masterQueue + class, so a
381-
// token can only ever reach its own region's queues (default or :scheduled).
384+
const workerQueue = workerQueueForClass(this.masterQueue, queueClass);
385+
386+
if (isWorkerQueueDequeueDisabled(workerQueue)) {
387+
recordBlockedDequeue(workerQueue);
388+
return [];
389+
}
390+
382391
return await this._engine.dequeueFromWorkerQueue({
383392
consumerId: this.workerInstanceId,
384-
workerQueue: workerQueueForClass(this.masterQueue, queueClass),
393+
workerQueue,
385394
workerId: this.workerInstanceId,
386395
runnerId,
387396
});
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { describe, expect, it } from "vitest";
2+
import {
3+
matchesDisabledWorkerQueue,
4+
parseDisabledWorkerQueues,
5+
} from "~/runEngine/concerns/workerQueueSplit.server";
6+
7+
describe("parseDisabledWorkerQueues", () => {
8+
it("returns an empty set for undefined or empty input", () => {
9+
expect(parseDisabledWorkerQueues(undefined).size).toBe(0);
10+
expect(parseDisabledWorkerQueues("").size).toBe(0);
11+
expect(parseDisabledWorkerQueues(" , ,").size).toBe(0);
12+
});
13+
14+
it("splits, trims, and drops empties", () => {
15+
const parsed = parseDisabledWorkerQueues(" eu-central-1 , us-east-1:scheduled ,, ");
16+
expect([...parsed]).toEqual(["eu-central-1", "us-east-1:scheduled"]);
17+
});
18+
});
19+
20+
describe("matchesDisabledWorkerQueue", () => {
21+
it("never matches when the disabled set is empty", () => {
22+
const empty = parseDisabledWorkerQueues(undefined);
23+
expect(matchesDisabledWorkerQueue("eu-central-1", empty)).toBe(false);
24+
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", empty)).toBe(false);
25+
});
26+
27+
it("gates the base region and its scheduled split when the base region is listed", () => {
28+
const disabled = parseDisabledWorkerQueues("eu-central-1");
29+
expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(true);
30+
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true);
31+
});
32+
33+
it("leaves other regions alone", () => {
34+
const disabled = parseDisabledWorkerQueues("eu-central-1");
35+
expect(matchesDisabledWorkerQueue("us-east-1", disabled)).toBe(false);
36+
expect(matchesDisabledWorkerQueue("us-east-1:scheduled", disabled)).toBe(false);
37+
});
38+
39+
it("gates only the scheduled split when a full worker queue is listed", () => {
40+
const disabled = parseDisabledWorkerQueues("eu-central-1:scheduled");
41+
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true);
42+
expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(false);
43+
});
44+
});

0 commit comments

Comments
 (0)