Skip to content

Commit 5ffe5f0

Browse files
committed
feat(supervisor): wide events on snapshot lifecycle
1 parent 791e359 commit 5ffe5f0

3 files changed

Lines changed: 90 additions & 15 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: supervisor
3+
type: improvement
4+
---
5+
6+
Add observability events at the schedule, dispatch, and callback phases of the snapshot lifecycle.

apps/supervisor/src/services/computeSnapshotService.ts

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@ import { type SnapshotCallbackPayload } from "@internal/compute";
66
import type { ComputeWorkloadManager } from "../workloadManager/compute.js";
77
import { TimerWheel } from "./timerWheel.js";
88
import type { OtlpTraceService } from "./otlpTraceService.js";
9+
import {
10+
emitOneShot,
11+
fromContext,
12+
recordPhaseSince,
13+
runWideEvent,
14+
setExtra,
15+
setMeta,
16+
type WideEventOptions,
17+
} from "../wideEvents/index.js";
918

1019
type DelayedSnapshot = {
1120
runnerId: string;
@@ -24,6 +33,7 @@ export type ComputeSnapshotServiceOptions = {
2433
computeManager: ComputeWorkloadManager;
2534
workerClient: SupervisorHttpClient;
2635
tracing?: OtlpTraceService;
36+
wideEventOpts: WideEventOptions;
2737
};
2838

2939
export class ComputeSnapshotService {
@@ -37,11 +47,13 @@ export class ComputeSnapshotService {
3747
private readonly computeManager: ComputeWorkloadManager;
3848
private readonly workerClient: SupervisorHttpClient;
3949
private readonly tracing?: OtlpTraceService;
50+
private readonly wideEventOpts: WideEventOptions;
4051

4152
constructor(opts: ComputeSnapshotServiceOptions) {
4253
this.computeManager = opts.computeManager;
4354
this.workerClient = opts.workerClient;
4455
this.tracing = opts.tracing;
56+
this.wideEventOpts = opts.wideEventOpts;
4557

4658
this.dispatchLimit = pLimit(this.computeManager.snapshotDispatchLimit);
4759
this.timerWheel = new TimerWheel<DelayedSnapshot>({
@@ -62,6 +74,16 @@ export class ComputeSnapshotService {
6274
/** Schedule a delayed snapshot for a run. Replaces any pending snapshot for the same run. */
6375
schedule(runFriendlyId: string, data: DelayedSnapshot) {
6476
this.timerWheel.submit(runFriendlyId, data);
77+
emitOneShot({
78+
...this.wideEventOpts,
79+
populate: (state) => {
80+
state.extras.op = "snapshot.schedule";
81+
state.meta.run_id = runFriendlyId;
82+
state.meta.snapshot_id = data.snapshotFriendlyId;
83+
state.extras.runner_id = data.runnerId;
84+
state.extras.delay_ms = this.computeManager.snapshotDelayMs;
85+
},
86+
});
6587
this.logger.debug("Snapshot scheduled", {
6688
runFriendlyId,
6789
snapshotFriendlyId: data.snapshotFriendlyId,
@@ -73,6 +95,13 @@ export class ComputeSnapshotService {
7395
cancel(runFriendlyId: string): boolean {
7496
const cancelled = this.timerWheel.cancel(runFriendlyId);
7597
if (cancelled) {
98+
emitOneShot({
99+
...this.wideEventOpts,
100+
populate: (state) => {
101+
state.extras.op = "snapshot.canceled";
102+
state.meta.run_id = runFriendlyId;
103+
},
104+
});
76105
this.logger.debug("Snapshot cancelled", { runFriendlyId });
77106
}
78107
return cancelled;
@@ -81,6 +110,24 @@ export class ComputeSnapshotService {
81110
/** Handle the callback from the gateway after a snapshot completes or fails. */
82111
async handleCallback(body: SnapshotCallbackPayload) {
83112
const snapshotId = body.status === "completed" ? body.snapshot_id : undefined;
113+
const runId = body.metadata?.runId;
114+
const snapshotFriendlyId = body.metadata?.snapshotFriendlyId;
115+
116+
// Enrich the wrapping route's wide event with snapshot metadata. The
117+
// `/api/v1/compute/snapshot-complete` route is registered with `wideRoute`,
118+
// so `fromContext()` returns the State of that route and these calls
119+
// become extras/meta on the same wide event - no nested emission.
120+
const state = fromContext();
121+
if (state) {
122+
state.extras.op = "snapshot.callback";
123+
state.extras["snapshot.status"] = body.status;
124+
if (body.instance_id) state.extras["snapshot.instance_id"] = body.instance_id;
125+
if (body.duration_ms !== undefined) state.extras["snapshot.duration_ms"] = body.duration_ms;
126+
if (snapshotId) state.extras["snapshot.id"] = snapshotId;
127+
if (body.status === "failed" && body.error) state.extras["snapshot.error"] = body.error;
128+
}
129+
if (runId) setMeta(state, "run_id", runId);
130+
if (snapshotFriendlyId) setMeta(state, "snapshot_id", snapshotFriendlyId);
84131

85132
this.logger.debug("Snapshot callback", {
86133
snapshotId,
@@ -91,9 +138,6 @@ export class ComputeSnapshotService {
91138
durationMs: body.duration_ms,
92139
});
93140

94-
const runId = body.metadata?.runId;
95-
const snapshotFriendlyId = body.metadata?.snapshotFriendlyId;
96-
97141
if (!runId || !snapshotFriendlyId) {
98142
this.logger.error("Snapshot callback missing metadata", { body });
99143
return { ok: false as const, status: 400 };
@@ -102,6 +146,7 @@ export class ComputeSnapshotService {
102146
this.#emitSnapshotSpan(runId, body.duration_ms, snapshotId);
103147

104148
if (body.status === "completed") {
149+
const submitStart = performance.now();
105150
const result = await this.workerClient.submitSuspendCompletion({
106151
runId,
107152
snapshotId: snapshotFriendlyId,
@@ -113,6 +158,11 @@ export class ComputeSnapshotService {
113158
},
114159
},
115160
});
161+
recordPhaseSince(
162+
"submit_completion",
163+
submitStart,
164+
result.success ? undefined : new Error(String(result.error))
165+
);
116166

117167
if (result.success) {
118168
this.logger.debug("Suspend completion submitted", {
@@ -121,13 +171,15 @@ export class ComputeSnapshotService {
121171
snapshotId: body.snapshot_id,
122172
});
123173
} else {
174+
setExtra(state, "submit_completion.error", String(result.error));
124175
this.logger.error("Failed to submit suspend completion", {
125176
runId,
126177
snapshotFriendlyId,
127178
error: result.error,
128179
});
129180
}
130181
} else {
182+
const submitStart = performance.now();
131183
const result = await this.workerClient.submitSuspendCompletion({
132184
runId,
133185
snapshotId: snapshotFriendlyId,
@@ -136,8 +188,14 @@ export class ComputeSnapshotService {
136188
error: body.error ?? "Snapshot failed",
137189
},
138190
});
191+
recordPhaseSince(
192+
"submit_completion",
193+
submitStart,
194+
result.success ? undefined : new Error(String(result.error))
195+
);
139196

140197
if (!result.success) {
198+
setExtra(state, "submit_completion.error", String(result.error));
141199
this.logger.error("Failed to submit suspend failure", {
142200
runId,
143201
snapshotFriendlyId,
@@ -184,20 +242,30 @@ export class ComputeSnapshotService {
184242

185243
/** Dispatch a snapshot request to the gateway. */
186244
private async dispatch(snapshot: DelayedSnapshot): Promise<void> {
187-
const result = await this.computeManager.snapshot({
188-
runnerId: snapshot.runnerId,
189-
metadata: {
190-
runId: snapshot.runFriendlyId,
191-
snapshotFriendlyId: snapshot.snapshotFriendlyId,
245+
await runWideEvent(
246+
{
247+
...this.wideEventOpts,
248+
setup: (state) => {
249+
state.extras.op = "snapshot.dispatch";
250+
state.meta.run_id = snapshot.runFriendlyId;
251+
state.meta.snapshot_id = snapshot.snapshotFriendlyId;
252+
state.extras.runner_id = snapshot.runnerId;
253+
},
192254
},
193-
});
255+
async () => {
256+
const result = await this.computeManager.snapshot({
257+
runnerId: snapshot.runnerId,
258+
metadata: {
259+
runId: snapshot.runFriendlyId,
260+
snapshotFriendlyId: snapshot.snapshotFriendlyId,
261+
},
262+
});
194263

195-
if (!result) {
196-
this.logger.error("Failed to request snapshot", {
197-
runId: snapshot.runFriendlyId,
198-
runnerId: snapshot.runnerId,
199-
});
200-
}
264+
if (!result) {
265+
throw new Error("Snapshot dispatch returned no result");
266+
}
267+
}
268+
);
201269
}
202270

203271
#emitSnapshotSpan(runFriendlyId: string, durationMs?: number, snapshotId?: string) {

apps/supervisor/src/workloadServer/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
124124
computeManager: opts.computeManager,
125125
workerClient: opts.workerClient,
126126
tracing: opts.tracing,
127+
wideEventOpts: this.wideEventOpts,
127128
});
128129
}
129130

0 commit comments

Comments
 (0)