Skip to content

Commit 5a3a442

Browse files
committed
feat(supervisor): wide-event module + kill switch
1 parent 436b7a9 commit 5a3a442

14 files changed

Lines changed: 1058 additions & 0 deletions

File tree

apps/supervisor/src/env.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,11 @@ const Env = z
256256
// Debug
257257
DEBUG: BoolEnv.default(false),
258258
SEND_RUN_DEBUG_LOGS: BoolEnv.default(false),
259+
260+
// Wide-event observability - off by default. Emits one flat-keyed JSON
261+
// line per natural unit of work (dequeue iteration, HTTP request, socket
262+
// lifecycle). High-QPS hotpath, so the kill switch must be honoured.
263+
TRIGGER_WIDE_EVENTS_ENABLED: BoolEnv.default(false),
259264
})
260265
.superRefine((data, ctx) => {
261266
if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { AsyncLocalStorage } from "node:async_hooks";
2+
import type { State } from "./state.js";
3+
4+
/**
5+
* AsyncLocalStorage threading per-operation `State` through the call stack.
6+
* Wrappers enter a state via `wideEventStorage.run(state, () => fn())` and
7+
* any code in the async call tree retrieves it via `fromContext()`.
8+
*/
9+
export const wideEventStorage = new AsyncLocalStorage<State>();
10+
11+
/** Returns the State attached to the current async context, or null. */
12+
export function fromContext(): State | null {
13+
return wideEventStorage.getStore() ?? null;
14+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { describe, it, expect } from "vitest";
2+
import { emit, EmitMessage } from "./emit.js";
3+
import { newState } from "./new.js";
4+
5+
function captureEmit(state: Parameters<typeof emit>[0]): Record<string, unknown> {
6+
const captured: string[] = [];
7+
const origWrite = process.stdout.write;
8+
process.stdout.write = ((chunk: unknown) => {
9+
captured.push(String(chunk));
10+
return true;
11+
}) as typeof process.stdout.write;
12+
try {
13+
emit(state);
14+
} finally {
15+
process.stdout.write = origWrite;
16+
}
17+
expect(captured).toHaveLength(1);
18+
const line = captured[0];
19+
if (!line) throw new Error("no captured line");
20+
return JSON.parse(line) as Record<string, unknown>;
21+
}
22+
23+
describe("emit", () => {
24+
it("emits a single line with the stable message + request_id", () => {
25+
const s = newState({ service: "supervisor", env: {} });
26+
s.statusCode = 200;
27+
s.ok = true;
28+
s.durationMs = 5;
29+
const out = captureEmit(s);
30+
expect(out.msg).toBe(EmitMessage);
31+
expect(out.request_id).toBe(s.requestId);
32+
expect(out.service).toBe("supervisor");
33+
expect(out.ok).toBe(true);
34+
expect(out.status).toBe(200);
35+
expect(out.duration_ms).toBe(5);
36+
});
37+
38+
it("omits empty optional fields", () => {
39+
const s = newState({ service: "supervisor", env: {} });
40+
s.statusCode = 200;
41+
s.ok = true;
42+
const out = captureEmit(s);
43+
expect(out).not.toHaveProperty("trace_id");
44+
expect(out).not.toHaveProperty("version");
45+
expect(out).not.toHaveProperty("commit_sha");
46+
expect(out).not.toHaveProperty("instance_id");
47+
expect(out).not.toHaveProperty("error.code");
48+
});
49+
50+
it("flattens meta keys as meta.<key>", () => {
51+
const s = newState({ service: "supervisor", env: {} });
52+
s.statusCode = 200;
53+
s.ok = true;
54+
s.meta.run_id = "run_abc";
55+
s.meta.deployment_id = "dep_xyz";
56+
const out = captureEmit(s);
57+
expect(out["meta.run_id"]).toBe("run_abc");
58+
expect(out["meta.deployment_id"]).toBe("dep_xyz");
59+
expect(out).not.toHaveProperty("meta");
60+
});
61+
62+
it("flattens phases as phase.<name>.<field>", () => {
63+
const s = newState({ service: "supervisor", env: {} });
64+
s.statusCode = 200;
65+
s.ok = true;
66+
s.phases.push({ name: "warm_start", durationMs: 12, ok: true, attempts: 1 });
67+
s.phases.push({
68+
name: "workload_create",
69+
durationMs: 3,
70+
ok: false,
71+
attempts: 2,
72+
errorCode: "Error",
73+
errorMsg: "boom",
74+
sub: { create_ms: 1 },
75+
});
76+
const out = captureEmit(s);
77+
expect(out["phase.warm_start.duration_ms"]).toBe(12);
78+
expect(out["phase.warm_start.ok"]).toBe(true);
79+
expect(out["phase.warm_start.attempts"]).toBe(1);
80+
expect(out["phase.workload_create.duration_ms"]).toBe(3);
81+
expect(out["phase.workload_create.ok"]).toBe(false);
82+
expect(out["phase.workload_create.attempts"]).toBe(2);
83+
expect(out["phase.workload_create.error_code"]).toBe("Error");
84+
expect(out["phase.workload_create.error_message"]).toBe("boom");
85+
expect(out["phase.workload_create.create_ms"]).toBe(1);
86+
});
87+
88+
it("includes error.code/message/kind when state.error is set", () => {
89+
const s = newState({ service: "supervisor", env: {} });
90+
s.statusCode = 500;
91+
s.error = { code: "InternalError", message: "kaboom", kind: "internal" };
92+
const out = captureEmit(s);
93+
expect(out["error.code"]).toBe("InternalError");
94+
expect(out["error.message"]).toBe("kaboom");
95+
expect(out["error.kind"]).toBe("internal");
96+
});
97+
98+
it("truncates very long error messages", () => {
99+
const s = newState({ service: "supervisor", env: {} });
100+
s.error = { code: "Big", message: "x".repeat(2000), kind: "internal" };
101+
const out = captureEmit(s);
102+
expect((out["error.message"] as string).length).toBe(512);
103+
});
104+
105+
it("flattens extras at the top level", () => {
106+
const s = newState({ service: "supervisor", env: {} });
107+
s.statusCode = 200;
108+
s.ok = true;
109+
s.extras.route = "/health";
110+
s.extras["dispatch.result"] = "hit";
111+
const out = captureEmit(s);
112+
expect(out.route).toBe("/health");
113+
expect(out["dispatch.result"]).toBe("hit");
114+
});
115+
});
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import type { State } from "./state.js";
2+
3+
/**
4+
* Stable slog message string for every wide event. Downstream filters (jq,
5+
* Axiom queries, Vector pipelines) pin to this constant. The `service` field
6+
* disambiguates which service emitted it.
7+
*/
8+
export const EmitMessage = "wide_event";
9+
10+
const MAX_ERROR_MSG_BYTES = 512;
11+
12+
/**
13+
* Serializes a State as a single flat-keyed JSON line on stdout. Keys are
14+
* flat (no nested objects) to keep jq filtering and Axiom indexing cheap.
15+
* Empty optional fields are omitted.
16+
*/
17+
export function emit(state: State): void {
18+
const out: Record<string, unknown> = {
19+
msg: EmitMessage,
20+
request_id: state.requestId,
21+
};
22+
23+
if (state.traceId) out.trace_id = state.traceId;
24+
appendIfSet(out, "service", state.service);
25+
appendIfSet(out, "version", state.version);
26+
appendIfSet(out, "commit_sha", state.commitSha);
27+
appendIfSet(out, "region", state.region);
28+
appendIfSet(out, "node_id", state.nodeId);
29+
appendIfSet(out, "instance_id", state.instanceId);
30+
31+
out.ok = state.ok;
32+
if (state.statusCode !== 0) out.status = state.statusCode;
33+
out.duration_ms = state.durationMs;
34+
35+
if (state.error) {
36+
appendIfSet(out, "error.code", state.error.code);
37+
const msg =
38+
state.error.message.length > MAX_ERROR_MSG_BYTES
39+
? state.error.message.slice(0, MAX_ERROR_MSG_BYTES)
40+
: state.error.message;
41+
appendIfSet(out, "error.message", msg);
42+
appendIfSet(out, "error.kind", state.error.kind);
43+
}
44+
45+
for (const [k, v] of Object.entries(state.meta)) {
46+
out["meta." + k] = v;
47+
}
48+
49+
for (const p of state.phases) {
50+
const prefix = "phase." + p.name + ".";
51+
out[prefix + "duration_ms"] = p.durationMs;
52+
out[prefix + "ok"] = p.ok;
53+
out[prefix + "attempts"] = p.attempts;
54+
if (p.errorCode) out[prefix + "error_code"] = p.errorCode;
55+
if (p.errorMsg) out[prefix + "error_message"] = p.errorMsg;
56+
if (p.sub) {
57+
for (const [sk, sv] of Object.entries(p.sub)) {
58+
out[prefix + sk] = sv;
59+
}
60+
}
61+
}
62+
63+
for (const [k, v] of Object.entries(state.extras)) {
64+
out[k] = v;
65+
}
66+
67+
process.stdout.write(JSON.stringify(out) + "\n");
68+
}
69+
70+
function appendIfSet(out: Record<string, unknown>, key: string, value: string | undefined): void {
71+
if (value) out[key] = value;
72+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Wide-event observability surface for the supervisor. One flat-keyed JSON
3+
* line per natural unit of work (HTTP request, dequeue iteration, socket
4+
* lifecycle event). Events join across services via `trace_id` (parsed from
5+
* the inbound W3C `traceparent`) and `meta.run_id`.
6+
*
7+
* Off by default behind a kill switch - the dispatch hotpath runs at high
8+
* QPS, so logging pressure must be cleanly removable.
9+
*/
10+
export { type Env, isValidRequestId, newState, type NewStateOptions } from "./new.js";
11+
export { emit, EmitMessage } from "./emit.js";
12+
export { parseTraceId } from "./traceparent.js";
13+
export { fromContext, wideEventStorage } from "./context.js";
14+
export {
15+
type PhaseOpt,
16+
recordPhase,
17+
recordPhaseSince,
18+
timePhase,
19+
} from "./record.js";
20+
export {
21+
emitOneShot,
22+
runWideEvent,
23+
setExtra,
24+
setMeta,
25+
type WideEventLifecycleOptions,
26+
type WideEventOptions,
27+
} from "./middleware.js";
28+
export type { ErrorInfo, PhaseRecord, State } from "./state.js";

0 commit comments

Comments
 (0)