Skip to content

Commit eba7141

Browse files
committed
feat(supervisor): forward Baggage outbound on compute calls
1 parent 06e39d4 commit eba7141

4 files changed

Lines changed: 91 additions & 4 deletions

File tree

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { describe, it, expect } from "vitest";
2+
import { encodeBaggage } from "./baggage.js";
3+
4+
describe("encodeBaggage", () => {
5+
it("returns empty string for an empty map", () => {
6+
expect(encodeBaggage({})).toBe("");
7+
});
8+
9+
it("encodes a single entry as k=v", () => {
10+
expect(encodeBaggage({ run_id: "run-1" })).toBe("run_id=run-1");
11+
});
12+
13+
it("sorts keys for stable output across hops", () => {
14+
expect(encodeBaggage({ b: "2", a: "1", c: "3" })).toBe("a=1,b=2,c=3");
15+
});
16+
17+
it("skips empty keys and empty values", () => {
18+
expect(encodeBaggage({ "": "v", k: "", real: "x" })).toBe("real=x");
19+
});
20+
21+
it("truncates values longer than the cap", () => {
22+
const long = "x".repeat(1024);
23+
const got = encodeBaggage({ k: long });
24+
const value = got.slice("k=".length);
25+
expect(value.length).toBe(256);
26+
});
27+
28+
it("caps the number of entries", () => {
29+
const meta: Record<string, string> = {};
30+
for (let i = 0; i < 50; i++) {
31+
// Sortable two-digit keys so we know which 32 survive.
32+
meta[`k${String(i).padStart(2, "0")}`] = "v";
33+
}
34+
const got = encodeBaggage(meta);
35+
expect(got.split(",").length).toBe(32);
36+
});
37+
});
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* W3C Baggage (https://www.w3.org/TR/baggage/) encoding for outbound peer
3+
* calls. Serialises a State's `meta` map into a `Baggage` header value so
4+
* the downstream service auto-stamps the same labels onto its own wide
5+
* events - even on early-error paths that bail before parsing the request
6+
* body.
7+
*
8+
* Outbound discipline: only call this on peer-to-peer hops within the trust
9+
* boundary. External-endpoint calls (image registries, cloud-provider
10+
* APIs, third-party webhooks) must not include the Baggage header.
11+
*/
12+
13+
/**
14+
* Cap the number of entries serialised onto the header. A misbehaving
15+
* caller's `meta` map shouldn't blow up downstream event width.
16+
*/
17+
const MAX_BAGGAGE_ENTRIES = 32;
18+
19+
/**
20+
* Cap each value's length. Defense against an upstream that stuffs
21+
* unbounded payloads into a meta value.
22+
*/
23+
const MAX_BAGGAGE_VALUE_BYTES = 256;
24+
25+
/**
26+
* Encode a `meta` map as a Baggage header value (`k1=v1,k2=v2`). Keys are
27+
* sorted for stable output across hops; an empty input yields the empty
28+
* string so the caller can skip emitting the header entirely.
29+
*/
30+
export function encodeBaggage(meta: Record<string, string>): string {
31+
const entries = Object.entries(meta).filter(([k, v]) => k && v);
32+
if (entries.length === 0) return "";
33+
34+
entries.sort(([a], [b]) => (a < b ? -1 : a > b ? 1 : 0));
35+
36+
const out: string[] = [];
37+
for (const [k, raw] of entries) {
38+
if (out.length >= MAX_BAGGAGE_ENTRIES) break;
39+
const v = raw.length > MAX_BAGGAGE_VALUE_BYTES ? raw.slice(0, MAX_BAGGAGE_VALUE_BYTES) : raw;
40+
out.push(`${k}=${v}`);
41+
}
42+
return out.join(",");
43+
}

apps/supervisor/src/wideEvents/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ export {
2626
type WideEventOptions,
2727
} from "./middleware.js";
2828
export type { ErrorInfo, PhaseRecord, State } from "./state.js";
29+
export { encodeBaggage } from "./baggage.js";

apps/supervisor/src/workloadManager/compute.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { ComputeClient, stripImageDigest } from "@internal/compute";
1010
import { extractTraceparent, getRunnerId } from "../util.js";
1111
import type { OtlpTraceService } from "../services/otlpTraceService.js";
1212
import { tryCatch } from "@trigger.dev/core";
13-
import { fromContext } from "../wideEvents/index.js";
13+
import { encodeBaggage, fromContext } from "../wideEvents/index.js";
1414

1515
type ComputeWorkloadManagerOptions = WorkloadManagerOptions & {
1616
gateway: {
@@ -49,16 +49,22 @@ export class ComputeWorkloadManager implements WorkloadManager {
4949
timeoutMs: opts.gateway.timeoutMs,
5050
// Forward the current wide-event scope's traceparent + request_id so the
5151
// downstream service continues the same trace and joins its own wide
52-
// events to ours. When called outside a wide-event scope (or when wide
53-
// events are disabled), `fromContext` returns undefined and propagation
54-
// is skipped.
52+
// events to ours. Additionally serialize caller-supplied meta labels
53+
// into the W3C Baggage header so the downstream service auto-stamps
54+
// them even on early-error paths that bail before parsing the body.
55+
// When called outside a wide-event scope (or when wide events are
56+
// disabled), `fromContext` returns undefined and propagation is skipped.
5557
getPropagationHeaders: () => {
5658
const state = fromContext();
5759
if (!state) return {};
5860
const headers: Record<string, string> = { "x-request-id": state.requestId };
5961
if (state.traceparent) {
6062
headers.traceparent = state.traceparent;
6163
}
64+
const baggage = encodeBaggage(state.meta);
65+
if (baggage) {
66+
headers.baggage = baggage;
67+
}
6268
return headers;
6369
},
6470
});

0 commit comments

Comments
 (0)