Skip to content

Commit e2eefd8

Browse files
committed
fix(supervisor): source-aware backpressure config, 5s scrape interval, parser hardening
1 parent f7b3e7f commit e2eefd8

6 files changed

Lines changed: 67 additions & 5 deletions

File tree

apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ describe("parsePodCount", () => {
2424

2525
it("throws when the pods metric is absent", () => {
2626
const text = 'apiserver_storage_objects{resource="configmaps"} 17';
27+
expect(() => parsePodCount(text)).toThrow(/not found/);
28+
});
29+
30+
it("throws on a non-finite value (e.g. 1e999)", () => {
31+
const text = 'apiserver_storage_objects{resource="pods"} 1e999';
32+
expect(() => parsePodCount(text)).toThrow();
33+
});
34+
35+
it("throws on a negative value", () => {
36+
const text = 'apiserver_storage_objects{resource="pods"} -5';
2737
expect(() => parsePodCount(text)).toThrow();
2838
});
2939
});

apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { BackpressureSignalSource, BackpressureVerdict } from "./backpressureMonitor.js";
22

33
// Reads the apiserver's stored-pod-object count from a Prometheus /metrics scrape.
4-
const POD_COUNT_RE = /^apiserver_storage_objects\{[^}]*resource="pods"[^}]*\}\s+([0-9.eE+-]+)/m;
4+
const POD_COUNT_RE = /^apiserver_storage_objects\{[^}]*resource="pods"[^}]*\}\s+([0-9.eE+]+)/m;
55

66
export function parsePodCount(metricsText: string): number {
77
const match = metricsText.match(POD_COUNT_RE);

apps/supervisor/src/clients/kubernetes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export function createApiserverMetricsFetcher(): () => Promise<string> {
6868
throw new Error("no current cluster in kubeconfig");
6969
}
7070
const requestInit = await kubeConfig.applyToFetchOptions({ method: "GET" });
71+
// node-fetch vs DOM RequestInit: structurally compatible, declaration-only mismatch
7172
const response = await fetch(`${cluster.server}/metrics`, requestInit as unknown as RequestInit);
7273
if (!response.ok) {
7374
throw new Error(`apiserver /metrics scrape failed: ${response.status}`);

apps/supervisor/src/env.test.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { describe, it, expect, vi } from "vitest";
2+
3+
// Mock std-env before importing env.ts so the module-level `Env.parse(stdEnv)`
4+
// doesn't fail in a test environment that lacks required vars.
5+
vi.mock("std-env", () => ({
6+
env: {
7+
TRIGGER_API_URL: "http://localhost:3030",
8+
TRIGGER_WORKER_TOKEN: "test-token",
9+
MANAGED_WORKER_SECRET: "test-secret",
10+
OTEL_EXPORTER_OTLP_ENDPOINT: "http://localhost:4318",
11+
},
12+
}));
13+
14+
const { Env } = await import("./env.js");
15+
16+
// Minimal env that satisfies all required fields; everything else has defaults.
17+
const base = {
18+
TRIGGER_API_URL: "http://localhost:3030",
19+
TRIGGER_WORKER_TOKEN: "test-token",
20+
MANAGED_WORKER_SECRET: "test-secret",
21+
OTEL_EXPORTER_OTLP_ENDPOINT: "http://localhost:4318",
22+
};
23+
24+
describe("Env superRefine - backpressure source awareness", () => {
25+
it("accepts k8s-pod-count source without a Redis host", () => {
26+
expect(() =>
27+
Env.parse({
28+
...base,
29+
TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true",
30+
TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: "k8s-pod-count",
31+
})
32+
).not.toThrow();
33+
});
34+
35+
it("rejects redis source when Redis host is absent", () => {
36+
expect(() =>
37+
Env.parse({
38+
...base,
39+
TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true",
40+
TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: "redis",
41+
})
42+
).toThrow();
43+
});
44+
});

apps/supervisor/src/env.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { env as stdEnv } from "std-env";
33
import { z } from "zod";
44
import { AdditionalEnvVars, BoolEnv } from "./envUtil.js";
55

6-
const Env = z
6+
export const Env = z
77
.object({
88
// This will come from `spec.nodeName` in k8s
99
TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()),
@@ -79,6 +79,7 @@ const Env = z
7979
TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: z.enum(["redis", "k8s-pod-count"]).default("redis"),
8080
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: z.coerce.number().int().positive().default(10_000),
8181
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: z.coerce.number().int().positive().default(5_000),
82+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS: z.coerce.number().int().positive().default(5_000),
8283

8384
// Optional services
8485
TRIGGER_WARM_START_URL: z.string().optional(),
@@ -332,7 +333,11 @@ const Env = z
332333
path: ["TRIGGER_WORKLOAD_API_DOMAIN"],
333334
});
334335
}
335-
if (data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST) {
336+
if (
337+
data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED &&
338+
data.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "redis" &&
339+
!data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST
340+
) {
336341
ctx.addIssue({
337342
code: z.ZodIssueCode.custom,
338343
message:

apps/supervisor/src/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,10 @@ class ManagedSupervisor {
219219

220220
if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) {
221221
let source: BackpressureSignalSource;
222+
let refreshIntervalMs = env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS;
222223

223224
if (env.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "k8s-pod-count") {
225+
refreshIntervalMs = env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS;
224226
if (
225227
env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >=
226228
env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE
@@ -243,7 +245,7 @@ class ManagedSupervisor {
243245
this.logger.log("🛑 Dequeue backpressure enabled (pod-count source)", {
244246
engage: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE,
245247
release: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE,
246-
refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS,
248+
refreshIntervalMs,
247249
dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN,
248250
});
249251
} else {
@@ -274,7 +276,7 @@ class ManagedSupervisor {
274276
this.backpressureMonitor = new BackpressureMonitor({
275277
enabled: true,
276278
source,
277-
refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS,
279+
refreshIntervalMs,
278280
maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS,
279281
rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS,
280282
dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN,

0 commit comments

Comments
 (0)