Skip to content

Commit 3179918

Browse files
committed
feat(supervisor): evaluate backpressure sources independently and OR them
1 parent e2eefd8 commit 3179918

3 files changed

Lines changed: 80 additions & 83 deletions

File tree

apps/supervisor/src/env.test.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,20 @@ const base = {
2222
};
2323

2424
describe("Env superRefine - backpressure source awareness", () => {
25-
it("accepts k8s-pod-count source without a Redis host", () => {
25+
it("pod-count source can be enabled without a Redis host", () => {
2626
expect(() =>
2727
Env.parse({
2828
...base,
29-
TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true",
30-
TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: "k8s-pod-count",
29+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true",
3130
})
3231
).not.toThrow();
3332
});
3433

35-
it("rejects redis source when Redis host is absent", () => {
34+
it("redis source requires a Redis host", () => {
3635
expect(() =>
3736
Env.parse({
3837
...base,
3938
TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true",
40-
TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: "redis",
4139
})
4240
).toThrow();
4341
});

apps/supervisor/src/env.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,8 @@ export const Env = z
7373
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME: z.string().optional(),
7474
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD: z.string().optional(),
7575
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED: BoolEnv.default(false),
76-
// Backpressure signal source. "redis" reads a verdict from a Redis key;
77-
// "k8s-pod-count" scrapes the cluster apiserver's total pod-object count and
78-
// engages above ENGAGE, releasing below RELEASE (hysteresis).
79-
TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: z.enum(["redis", "k8s-pod-count"]).default("redis"),
76+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: BoolEnv.default(false),
77+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN: BoolEnv.default(true),
8078
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: z.coerce.number().int().positive().default(10_000),
8179
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: z.coerce.number().int().positive().default(5_000),
8280
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS: z.coerce.number().int().positive().default(5_000),
@@ -333,11 +331,7 @@ export const Env = z
333331
path: ["TRIGGER_WORKLOAD_API_DOMAIN"],
334332
});
335333
}
336-
if (
337-
data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED &&
338-
data.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "redis" &&
339-
!data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST
340-
) {
334+
if (data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST) {
341335
ctx.addIssue({
342336
code: z.ZodIssueCode.custom,
343337
message:

apps/supervisor/src/index.ts

Lines changed: 74 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@ import {
3333
} from "./services/warmStartVerificationService.js";
3434
import { extractTraceparent, getRestoreRunnerId } from "./util.js";
3535
import { Redis } from "ioredis";
36-
import {
37-
BackpressureMonitor,
38-
type BackpressureSignalSource,
39-
} from "./backpressure/backpressureMonitor.js";
36+
import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js";
4037
import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js";
4138
import { BackpressureMetrics } from "./backpressure/backpressureMetrics.js";
4239
import { K8sPodCountSignalSource } from "./backpressure/k8sPodCountSignalSource.js";
@@ -76,7 +73,7 @@ class ManagedSupervisor {
7673
private readonly podCleaner?: PodCleaner;
7774
private readonly failedPodHandler?: FailedPodHandler;
7875
private readonly tracing?: OtlpTraceService;
79-
private readonly backpressureMonitor?: BackpressureMonitor;
76+
private readonly backpressureMonitors: BackpressureMonitor[] = [];
8077
private readonly backpressureRedis?: Redis;
8178

8279
private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED);
@@ -217,71 +214,79 @@ class ManagedSupervisor {
217214
);
218215
}
219216

217+
// Redis-verdict source (external aggregator). Keeps existing metric names.
220218
if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) {
221-
let source: BackpressureSignalSource;
222-
let refreshIntervalMs = env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS;
223-
224-
if (env.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "k8s-pod-count") {
225-
refreshIntervalMs = env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS;
226-
if (
227-
env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >=
228-
env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE
229-
) {
230-
throw new Error(
231-
"TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE"
232-
);
233-
}
234-
const podCountGauge = new Gauge({
235-
name: "supervisor_cluster_pod_count",
236-
help: "Total pod objects stored in the cluster, scraped for backpressure",
237-
registers: [register],
238-
});
239-
source = new K8sPodCountSignalSource({
240-
fetchMetrics: createApiserverMetricsFetcher(),
241-
engageThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE,
242-
releaseThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE,
243-
reportPodCount: (count) => podCountGauge.set(count),
244-
});
245-
this.logger.log("🛑 Dequeue backpressure enabled (pod-count source)", {
246-
engage: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE,
247-
release: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE,
248-
refreshIntervalMs,
249-
dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN,
250-
});
251-
} else {
252-
this.backpressureRedis = new Redis({
253-
host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST,
254-
port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT,
255-
username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME,
256-
password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD,
257-
...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }),
258-
maxRetriesPerRequest: null,
259-
});
260-
this.backpressureRedis.on("error", (error) =>
261-
this.logger.error("Backpressure redis error", { error: error.message })
262-
);
263-
source = new RedisBackpressureSignalSource(
264-
this.backpressureRedis,
265-
env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY
266-
);
267-
this.logger.log("🛑 Dequeue backpressure enabled (redis source)", {
268-
key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY,
219+
this.backpressureRedis = new Redis({
220+
host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST,
221+
port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT,
222+
username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME,
223+
password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD,
224+
...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }),
225+
maxRetriesPerRequest: null,
226+
});
227+
this.backpressureRedis.on("error", (error) =>
228+
this.logger.error("Backpressure redis error", { error: error.message })
229+
);
230+
this.backpressureMonitors.push(
231+
new BackpressureMonitor({
232+
enabled: true,
233+
source: new RedisBackpressureSignalSource(
234+
this.backpressureRedis,
235+
env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY
236+
),
269237
refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS,
270238
maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS,
271239
rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS,
272240
dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN,
273-
});
274-
}
275-
276-
this.backpressureMonitor = new BackpressureMonitor({
277-
enabled: true,
278-
source,
279-
refreshIntervalMs,
280-
maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS,
281-
rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS,
241+
logger: this.logger,
242+
metrics: new BackpressureMetrics({ register }),
243+
})
244+
);
245+
this.logger.log("🛑 Dequeue backpressure enabled (redis source)", {
246+
key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY,
247+
refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS,
282248
dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN,
283-
logger: this.logger,
284-
metrics: new BackpressureMetrics({ register }),
249+
});
250+
}
251+
252+
// Pod-count source (in-process apiserver scrape). Namespaced metrics so the
253+
// redis source's metric names are preserved.
254+
if (env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED) {
255+
if (
256+
env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >=
257+
env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE
258+
) {
259+
throw new Error(
260+
"TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE"
261+
);
262+
}
263+
const podCountGauge = new Gauge({
264+
name: "supervisor_cluster_pod_count",
265+
help: "Total pod objects stored in the cluster, scraped for backpressure",
266+
registers: [register],
267+
});
268+
this.backpressureMonitors.push(
269+
new BackpressureMonitor({
270+
enabled: true,
271+
source: new K8sPodCountSignalSource({
272+
fetchMetrics: createApiserverMetricsFetcher(),
273+
engageThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE,
274+
releaseThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE,
275+
reportPodCount: (count) => podCountGauge.set(count),
276+
}),
277+
refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS,
278+
maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS,
279+
rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS,
280+
dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN,
281+
logger: this.logger,
282+
metrics: new BackpressureMetrics({ register, prefix: "supervisor_backpressure_pod_count" }),
283+
})
284+
);
285+
this.logger.log("🛑 Dequeue backpressure enabled (pod-count source)", {
286+
engage: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE,
287+
release: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE,
288+
refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS,
289+
dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN,
285290
});
286291
}
287292

@@ -308,14 +313,14 @@ class ManagedSupervisor {
308313
dampingFactor: env.TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR,
309314
// Freeze scale-up while backpressure is hard-engaged (not during the resume
310315
// ramp). Undefined when backpressure is disabled → no effect on scaling.
311-
shouldPauseScaling: () => this.backpressureMonitor?.isEngaged() ?? false,
316+
shouldPauseScaling: () => this.backpressureMonitors.some((m) => m.isEngaged()),
312317
},
313318
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
314319
heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS,
315320
sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS,
316321
preDequeue: async () => {
317-
// Synchronous, hot-path-safe cached read; undefined when backpressure is disabled.
318-
const skipForBackpressure = this.backpressureMonitor?.shouldSkipDequeue() ?? false;
322+
// Synchronous, hot-path-safe cached read; false when no monitors are active.
323+
const skipForBackpressure = this.backpressureMonitors.some((m) => m.shouldSkipDequeue());
319324

320325
if (!env.RESOURCE_MONITOR_ENABLED || this.isKubernetes) {
321326
// Resource monitor is not used in k8s; backpressure is the only gate there.
@@ -710,7 +715,7 @@ class ManagedSupervisor {
710715
this.logger.log("Starting up");
711716

712717
// Optional services
713-
this.backpressureMonitor?.start();
718+
this.backpressureMonitors.forEach((m) => m.start());
714719
await this.podCleaner?.start();
715720
await this.failedPodHandler?.start();
716721
await this.metricsServer?.start();
@@ -738,7 +743,7 @@ class ManagedSupervisor {
738743
await this.workerSession.stop();
739744

740745
// Optional services
741-
this.backpressureMonitor?.stop();
746+
this.backpressureMonitors.forEach((m) => m.stop());
742747
await this.backpressureRedis?.quit();
743748
await this.podCleaner?.stop();
744749
await this.failedPodHandler?.stop();

0 commit comments

Comments
 (0)