Skip to content

Commit b0e3be3

Browse files
committed
feat: add resource-scoped jobs plugin for Databricks Lakeflow Jobs
Jobs are configured as named resources (DATABRICKS_JOB_<KEY> env vars) and discovered at startup, following the files plugin pattern. API is scoped to configured jobs: appkit.jobs('etl').runNow() appkit.jobs('etl').runNowAndWait() appkit.jobs('etl').lastRun() appkit.jobs('etl').listRuns() appkit.jobs('etl').asUser(req).runNow() Single-job shorthand via DATABRICKS_JOB_ID env var. Supports OBO access via asUser(req). Co-authored-by: Isaac Signed-off-by: Evgenii Kniazev <evgenii.kniazev@databricks.com>
1 parent f94b3e2 commit b0e3be3

11 files changed

Lines changed: 1210 additions & 1 deletion

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from "./files";
22
export * from "./genie";
3+
export * from "./jobs";
34
export * from "./lakebase";
45
export * from "./lakebase-v1";
56
export * from "./sql-warehouse";
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
import {
2+
Context,
3+
type jobs,
4+
type WorkspaceClient,
5+
} from "@databricks/sdk-experimental";
6+
import { AppKitError, ExecutionError } from "../../errors";
7+
import { createLogger } from "../../logging/logger";
8+
import type { TelemetryProvider } from "../../telemetry";
9+
import {
10+
type Counter,
11+
type Histogram,
12+
type Span,
13+
SpanKind,
14+
SpanStatusCode,
15+
TelemetryManager,
16+
} from "../../telemetry";
17+
import type { JobsConnectorConfig } from "./types";
18+
19+
const logger = createLogger("connectors:jobs");
20+
21+
export class JobsConnector {
22+
private readonly name = "jobs";
23+
private readonly config: JobsConnectorConfig;
24+
private readonly telemetry: TelemetryProvider;
25+
private readonly telemetryMetrics: {
26+
apiCallCount: Counter;
27+
apiCallDuration: Histogram;
28+
};
29+
30+
constructor(config: JobsConnectorConfig) {
31+
this.config = config;
32+
this.telemetry = TelemetryManager.getProvider(
33+
this.name,
34+
this.config.telemetry,
35+
);
36+
this.telemetryMetrics = {
37+
apiCallCount: this.telemetry
38+
.getMeter()
39+
.createCounter("jobs.api_call.count", {
40+
description: "Total number of Jobs API calls",
41+
unit: "1",
42+
}),
43+
apiCallDuration: this.telemetry
44+
.getMeter()
45+
.createHistogram("jobs.api_call.duration", {
46+
description: "Duration of Jobs API calls",
47+
unit: "ms",
48+
}),
49+
};
50+
}
51+
52+
async submitRun(
53+
workspaceClient: WorkspaceClient,
54+
request: jobs.SubmitRun,
55+
signal?: AbortSignal,
56+
): Promise<jobs.SubmitRunResponse> {
57+
return this._callApi("submit", async () => {
58+
const waiter = await workspaceClient.jobs.submit(
59+
request,
60+
this._createContext(signal),
61+
);
62+
return waiter.response;
63+
});
64+
}
65+
66+
async runNow(
67+
workspaceClient: WorkspaceClient,
68+
request: jobs.RunNow,
69+
signal?: AbortSignal,
70+
): Promise<jobs.RunNowResponse> {
71+
return this._callApi("runNow", async () => {
72+
const waiter = await workspaceClient.jobs.runNow(
73+
request,
74+
this._createContext(signal),
75+
);
76+
return waiter.response;
77+
});
78+
}
79+
80+
async getRun(
81+
workspaceClient: WorkspaceClient,
82+
request: jobs.GetRunRequest,
83+
signal?: AbortSignal,
84+
): Promise<jobs.Run> {
85+
return this._callApi("getRun", async () => {
86+
return workspaceClient.jobs.getRun(request, this._createContext(signal));
87+
});
88+
}
89+
90+
async getRunOutput(
91+
workspaceClient: WorkspaceClient,
92+
request: jobs.GetRunOutputRequest,
93+
signal?: AbortSignal,
94+
): Promise<jobs.RunOutput> {
95+
return this._callApi("getRunOutput", async () => {
96+
return workspaceClient.jobs.getRunOutput(
97+
request,
98+
this._createContext(signal),
99+
);
100+
});
101+
}
102+
103+
async cancelRun(
104+
workspaceClient: WorkspaceClient,
105+
request: jobs.CancelRun,
106+
signal?: AbortSignal,
107+
): Promise<void> {
108+
await this._callApi("cancelRun", async () => {
109+
const waiter = await workspaceClient.jobs.cancelRun(
110+
request,
111+
this._createContext(signal),
112+
);
113+
return waiter.response;
114+
});
115+
}
116+
117+
async listRuns(
118+
workspaceClient: WorkspaceClient,
119+
request: jobs.ListRunsRequest,
120+
signal?: AbortSignal,
121+
): Promise<jobs.BaseRun[]> {
122+
return this._callApi("listRuns", async () => {
123+
const runs: jobs.BaseRun[] = [];
124+
const limit = request.limit;
125+
for await (const run of workspaceClient.jobs.listRuns(
126+
request,
127+
this._createContext(signal),
128+
)) {
129+
runs.push(run);
130+
if (limit && runs.length >= limit) break;
131+
}
132+
return runs;
133+
});
134+
}
135+
136+
async getJob(
137+
workspaceClient: WorkspaceClient,
138+
request: jobs.GetJobRequest,
139+
signal?: AbortSignal,
140+
): Promise<jobs.Job> {
141+
return this._callApi("getJob", async () => {
142+
return workspaceClient.jobs.get(request, this._createContext(signal));
143+
});
144+
}
145+
146+
async createJob(
147+
workspaceClient: WorkspaceClient,
148+
request: jobs.CreateJob,
149+
signal?: AbortSignal,
150+
): Promise<jobs.CreateResponse> {
151+
return this._callApi("createJob", async () => {
152+
return workspaceClient.jobs.create(request, this._createContext(signal));
153+
});
154+
}
155+
156+
async waitForRun(
157+
workspaceClient: WorkspaceClient,
158+
runId: number,
159+
pollIntervalMs = 5000,
160+
timeoutMs?: number,
161+
signal?: AbortSignal,
162+
): Promise<jobs.Run> {
163+
const startTime = Date.now();
164+
const timeout = timeoutMs ?? this.config.timeout ?? 600000;
165+
166+
return this.telemetry.startActiveSpan(
167+
"jobs.waitForRun",
168+
{
169+
kind: SpanKind.CLIENT,
170+
attributes: {
171+
"jobs.run_id": runId,
172+
"jobs.poll_interval_ms": pollIntervalMs,
173+
"jobs.timeout_ms": timeout,
174+
},
175+
},
176+
async (span: Span) => {
177+
try {
178+
let pollCount = 0;
179+
180+
while (true) {
181+
pollCount++;
182+
const elapsed = Date.now() - startTime;
183+
184+
if (elapsed > timeout) {
185+
throw ExecutionError.statementFailed(
186+
`Job run ${runId} polling timeout after ${timeout}ms`,
187+
);
188+
}
189+
190+
if (signal?.aborted) {
191+
throw ExecutionError.canceled();
192+
}
193+
194+
span.addEvent("poll.attempt", {
195+
"poll.count": pollCount,
196+
"poll.elapsed_ms": elapsed,
197+
});
198+
199+
const run = await this.getRun(
200+
workspaceClient,
201+
{ run_id: runId },
202+
signal,
203+
);
204+
205+
const lifeCycleState = run.state?.life_cycle_state;
206+
207+
if (
208+
lifeCycleState === "TERMINATED" ||
209+
lifeCycleState === "SKIPPED" ||
210+
lifeCycleState === "INTERNAL_ERROR"
211+
) {
212+
span.setAttribute("jobs.final_state", lifeCycleState);
213+
span.setAttribute(
214+
"jobs.result_state",
215+
run.state?.result_state ?? "",
216+
);
217+
span.setAttribute("jobs.poll_count", pollCount);
218+
span.setStatus({ code: SpanStatusCode.OK });
219+
return run;
220+
}
221+
222+
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
223+
}
224+
} catch (error) {
225+
span.recordException(error as Error);
226+
span.setStatus({
227+
code: SpanStatusCode.ERROR,
228+
message: error instanceof Error ? error.message : String(error),
229+
});
230+
if (error instanceof AppKitError) {
231+
throw error;
232+
}
233+
throw ExecutionError.statementFailed(
234+
error instanceof Error ? error.message : String(error),
235+
);
236+
} finally {
237+
span.end();
238+
}
239+
},
240+
{ name: this.name, includePrefix: true },
241+
);
242+
}
243+
244+
private async _callApi<T>(
245+
operation: string,
246+
fn: () => Promise<T>,
247+
): Promise<T> {
248+
const startTime = Date.now();
249+
let success = false;
250+
251+
return this.telemetry.startActiveSpan(
252+
`jobs.${operation}`,
253+
{
254+
kind: SpanKind.CLIENT,
255+
attributes: {
256+
"jobs.operation": operation,
257+
},
258+
},
259+
async (span: Span) => {
260+
try {
261+
const result = await fn();
262+
success = true;
263+
span.setStatus({ code: SpanStatusCode.OK });
264+
return result;
265+
} catch (error) {
266+
span.recordException(error as Error);
267+
span.setStatus({
268+
code: SpanStatusCode.ERROR,
269+
message: error instanceof Error ? error.message : String(error),
270+
});
271+
if (error instanceof AppKitError) {
272+
throw error;
273+
}
274+
throw ExecutionError.statementFailed(
275+
error instanceof Error ? error.message : String(error),
276+
);
277+
} finally {
278+
const duration = Date.now() - startTime;
279+
span.end();
280+
this.telemetryMetrics.apiCallCount.add(1, {
281+
operation,
282+
success: success.toString(),
283+
});
284+
this.telemetryMetrics.apiCallDuration.record(duration, {
285+
operation,
286+
success: success.toString(),
287+
});
288+
289+
logger.event()?.setContext("jobs", {
290+
operation,
291+
duration_ms: duration,
292+
success,
293+
});
294+
}
295+
},
296+
{ name: this.name, includePrefix: true },
297+
);
298+
}
299+
300+
private _createContext(signal?: AbortSignal) {
301+
return new Context({
302+
cancellationToken: {
303+
isCancellationRequested: signal?.aborted ?? false,
304+
onCancellationRequested: (cb: () => void) => {
305+
signal?.addEventListener("abort", cb, { once: true });
306+
},
307+
},
308+
});
309+
}
310+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export { JobsConnector } from "./client";
2+
export type { JobsConnectorConfig } from "./types";
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import type { TelemetryOptions } from "shared";
2+
3+
export interface JobsConnectorConfig {
4+
timeout?: number;
5+
telemetry?: TelemetryOptions;
6+
}

packages/appkit/src/index.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export type {
1515
} from "shared";
1616
export { isSQLTypeMarker, sql } from "shared";
1717
export { CacheManager } from "./cache";
18+
export type { JobsConnectorConfig } from "./connectors/jobs";
1819
export type {
1920
DatabaseCredential,
2021
GenerateDatabaseCredentialRequest,
@@ -48,7 +49,21 @@ export {
4849
} from "./errors";
4950
// Plugin authoring
5051
export { Plugin, type ToPlugin, toPlugin } from "./plugin";
51-
export { analytics, files, genie, lakebase, server } from "./plugins";
52+
export {
53+
analytics,
54+
files,
55+
genie,
56+
jobs,
57+
lakebase,
58+
server,
59+
} from "./plugins";
60+
export type {
61+
IJobsConfig,
62+
JobAPI,
63+
JobConfig,
64+
JobHandle,
65+
JobsExport,
66+
} from "./plugins/jobs";
5267
// Registry types and utilities for plugin manifests
5368
export type {
5469
ConfigSchema,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from "./analytics";
22
export * from "./files";
33
export * from "./genie";
4+
export * from "./jobs";
45
export * from "./lakebase";
56
export * from "./server";
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
export { jobs } from "./plugin";
2+
export type {
3+
IJobsConfig,
4+
JobAPI,
5+
JobConfig,
6+
JobHandle,
7+
JobsExport,
8+
} from "./types";

0 commit comments

Comments
 (0)