diff --git a/docs/docs/api/appkit/Interface.BasePluginConfig.md b/docs/docs/api/appkit/Interface.BasePluginConfig.md index a7faffc6..3483c136 100644 --- a/docs/docs/api/appkit/Interface.BasePluginConfig.md +++ b/docs/docs/api/appkit/Interface.BasePluginConfig.md @@ -2,6 +2,10 @@ Base configuration interface for AppKit plugins +## Extended by + +- [`IJobsConfig`](Interface.IJobsConfig.md) + ## Indexable ```ts diff --git a/docs/docs/api/appkit/Interface.IJobsConfig.md b/docs/docs/api/appkit/Interface.IJobsConfig.md new file mode 100644 index 00000000..aeff8fa9 --- /dev/null +++ b/docs/docs/api/appkit/Interface.IJobsConfig.md @@ -0,0 +1,79 @@ +# Interface: IJobsConfig + +Configuration for the Jobs plugin. + +## Extends + +- [`BasePluginConfig`](Interface.BasePluginConfig.md) + +## Indexable + +```ts +[key: string]: unknown +``` + +## Properties + +### host? + +```ts +optional host: string; +``` + +#### Inherited from + +[`BasePluginConfig`](Interface.BasePluginConfig.md).[`host`](Interface.BasePluginConfig.md#host) + +*** + +### jobs? + +```ts +optional jobs: Record; +``` + +Named jobs to expose. Each key becomes a job accessor. + +*** + +### name? + +```ts +optional name: string; +``` + +#### Inherited from + +[`BasePluginConfig`](Interface.BasePluginConfig.md).[`name`](Interface.BasePluginConfig.md#name) + +*** + +### pollIntervalMs? + +```ts +optional pollIntervalMs: number; +``` + +Poll interval for waitForRun in milliseconds. Defaults to 5000. + +*** + +### telemetry? + +```ts +optional telemetry: TelemetryOptions; +``` + +#### Inherited from + +[`BasePluginConfig`](Interface.BasePluginConfig.md).[`telemetry`](Interface.BasePluginConfig.md#telemetry) + +*** + +### timeout? + +```ts +optional timeout: number; +``` + +Operation timeout in milliseconds. Defaults to 60000. diff --git a/docs/docs/api/appkit/Interface.JobAPI.md b/docs/docs/api/appkit/Interface.JobAPI.md new file mode 100644 index 00000000..5ef330fa --- /dev/null +++ b/docs/docs/api/appkit/Interface.JobAPI.md @@ -0,0 +1,155 @@ +# Interface: JobAPI + +User-facing API for a single configured job. + +## Methods + +### cancelRun() + +```ts +cancelRun(runId: number): Promise>; +``` + +Cancel a specific run. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `runId` | `number` | + +#### Returns + +`Promise`\<[`ExecutionResult`](TypeAlias.ExecutionResult.md)\<`void`\>\> + +*** + +### getJob() + +```ts +getJob(): Promise>; +``` + +Get the job definition. + +#### Returns + +`Promise`\<[`ExecutionResult`](TypeAlias.ExecutionResult.md)\<`Job`\>\> + +*** + +### getRun() + +```ts +getRun(runId: number): Promise>; +``` + +Get a specific run by ID. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `runId` | `number` | + +#### Returns + +`Promise`\<[`ExecutionResult`](TypeAlias.ExecutionResult.md)\<`Run`\>\> + +*** + +### getRunOutput() + +```ts +getRunOutput(runId: number): Promise>; +``` + +Get output of a specific run. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `runId` | `number` | + +#### Returns + +`Promise`\<[`ExecutionResult`](TypeAlias.ExecutionResult.md)\<`RunOutput`\>\> + +*** + +### lastRun() + +```ts +lastRun(): Promise>; +``` + +Get the most recent run for this job. + +#### Returns + +`Promise`\<[`ExecutionResult`](TypeAlias.ExecutionResult.md)\<`BaseRun` \| `undefined`\>\> + +*** + +### listRuns() + +```ts +listRuns(options?: { + limit?: number; +}): Promise>; +``` + +List runs for this job. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `options?` | \{ `limit?`: `number`; \} | +| `options.limit?` | `number` | + +#### Returns + +`Promise`\<[`ExecutionResult`](TypeAlias.ExecutionResult.md)\<`BaseRun`[]\>\> + +*** + +### runAndWait() + +```ts +runAndWait(params?: Record, signal?: AbortSignal): AsyncGenerator; +``` + +Trigger and poll until completion, yielding status updates. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `params?` | `Record`\<`string`, `unknown`\> | +| `signal?` | `AbortSignal` | + +#### Returns + +`AsyncGenerator`\<`JobRunStatus`, `void`, `unknown`\> + +*** + +### runNow() + +```ts +runNow(params?: Record): Promise>; +``` + +Trigger the configured job with validated params. Returns the run response. + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `params?` | `Record`\<`string`, `unknown`\> | + +#### Returns + +`Promise`\<[`ExecutionResult`](TypeAlias.ExecutionResult.md)\<`RunNowResponse`\>\> diff --git a/docs/docs/api/appkit/Interface.JobConfig.md b/docs/docs/api/appkit/Interface.JobConfig.md new file mode 100644 index 00000000..57664c0e --- /dev/null +++ b/docs/docs/api/appkit/Interface.JobConfig.md @@ -0,0 +1,33 @@ +# Interface: JobConfig + +Per-job configuration options. + +## Properties + +### params? + +```ts +optional params: ZodType, unknown, $ZodTypeInternals, unknown>>; +``` + +Optional Zod schema for validating job parameters at runtime. + +*** + +### taskType? + +```ts +optional taskType: TaskType; +``` + +The type of task this job runs. Determines how params are mapped to the SDK request. + +*** + +### waitTimeout? + +```ts +optional waitTimeout: number; +``` + +Maximum time (ms) to poll in runAndWait before giving up. Defaults to 600 000 (10 min). diff --git a/docs/docs/api/appkit/Interface.JobsConnectorConfig.md b/docs/docs/api/appkit/Interface.JobsConnectorConfig.md new file mode 100644 index 00000000..4e55a476 --- /dev/null +++ b/docs/docs/api/appkit/Interface.JobsConnectorConfig.md @@ -0,0 +1,9 @@ +# Interface: JobsConnectorConfig + +## Properties + +### telemetry? + +```ts +optional telemetry: TelemetryOptions; +``` diff --git a/docs/docs/api/appkit/TypeAlias.JobHandle.md b/docs/docs/api/appkit/TypeAlias.JobHandle.md new file mode 100644 index 00000000..f20a11b9 --- /dev/null +++ b/docs/docs/api/appkit/TypeAlias.JobHandle.md @@ -0,0 +1,28 @@ +# Type Alias: JobHandle + +```ts +type JobHandle = JobAPI & { + asUser: (req: IAppRequest) => JobAPI; +}; +``` + +Job handle returned by `appkit.jobs("etl")`. +Supports OBO access via `.asUser(req)`. + +## Type Declaration + +### asUser() + +```ts +asUser: (req: IAppRequest) => JobAPI; +``` + +#### Parameters + +| Parameter | Type | +| ------ | ------ | +| `req` | `IAppRequest` | + +#### Returns + +[`JobAPI`](Interface.JobAPI.md) diff --git a/docs/docs/api/appkit/TypeAlias.JobsExport.md b/docs/docs/api/appkit/TypeAlias.JobsExport.md new file mode 100644 index 00000000..3191346b --- /dev/null +++ b/docs/docs/api/appkit/TypeAlias.JobsExport.md @@ -0,0 +1,33 @@ +# Type Alias: JobsExport() + +```ts +type JobsExport = (jobKey: string) => JobHandle; +``` + +Public API shape of the jobs plugin. +Callable to select a job by key. + +## Parameters + +| Parameter | Type | +| ------ | ------ | +| `jobKey` | `string` | + +## Returns + +[`JobHandle`](TypeAlias.JobHandle.md) + +## Example + +```ts +// Trigger a configured job +const { run_id } = await appkit.jobs("etl").runNow(); + +// Trigger and poll until completion +for await (const status of appkit.jobs("etl").runAndWait()) { + console.log(status.status, status.run); +} + +// OBO access +await appkit.jobs("etl").asUser(req).runNow(); +``` diff --git a/docs/docs/api/appkit/index.md b/docs/docs/api/appkit/index.md index f5163db4..d125af87 100644 --- a/docs/docs/api/appkit/index.md +++ b/docs/docs/api/appkit/index.md @@ -35,7 +35,11 @@ plugin architecture, and React integration. | [DatabaseCredential](Interface.DatabaseCredential.md) | Database credentials with OAuth token for Postgres connection | | [EndpointConfig](Interface.EndpointConfig.md) | - | | [GenerateDatabaseCredentialRequest](Interface.GenerateDatabaseCredentialRequest.md) | Request parameters for generating database OAuth credentials | +| [IJobsConfig](Interface.IJobsConfig.md) | Configuration for the Jobs plugin. | | [ITelemetry](Interface.ITelemetry.md) | Plugin-facing interface for OpenTelemetry instrumentation. Provides a thin abstraction over OpenTelemetry APIs for plugins. | +| [JobAPI](Interface.JobAPI.md) | User-facing API for a single configured job. | +| [JobConfig](Interface.JobConfig.md) | Per-job configuration options. | +| [JobsConnectorConfig](Interface.JobsConnectorConfig.md) | - | | [LakebasePoolConfig](Interface.LakebasePoolConfig.md) | Configuration for creating a Lakebase connection pool | | [PluginManifest](Interface.PluginManifest.md) | Plugin manifest that declares metadata and resource requirements. Attached to plugin classes as a static property. Extends the shared PluginManifest with strict resource types. | | [RequestedClaims](Interface.RequestedClaims.md) | Optional claims for fine-grained Unity Catalog table permissions When specified, the returned token will be scoped to only the requested tables | @@ -56,6 +60,8 @@ plugin architecture, and React integration. | [ConfigSchema](TypeAlias.ConfigSchema.md) | Configuration schema definition for plugin config. Re-exported from the standard JSON Schema Draft 7 types. | | [ExecutionResult](TypeAlias.ExecutionResult.md) | Discriminated union for plugin execution results. | | [IAppRouter](TypeAlias.IAppRouter.md) | Express router type for plugin route registration | +| [JobHandle](TypeAlias.JobHandle.md) | Job handle returned by `appkit.jobs("etl")`. Supports OBO access via `.asUser(req)`. | +| [JobsExport](TypeAlias.JobsExport.md) | Public API shape of the jobs plugin. Callable to select a job by key. | | [PluginData](TypeAlias.PluginData.md) | Tuple of plugin class, config, and name. Created by `toPlugin()` and passed to `createApp()`. | | [ResourcePermission](TypeAlias.ResourcePermission.md) | Union of all possible permission levels across all resource types. | | [ServingFactory](TypeAlias.ServingFactory.md) | Factory function returned by `AppKit.serving`. | diff --git a/docs/docs/api/appkit/typedoc-sidebar.ts b/docs/docs/api/appkit/typedoc-sidebar.ts index 720c78ea..866f21e8 100644 --- a/docs/docs/api/appkit/typedoc-sidebar.ts +++ b/docs/docs/api/appkit/typedoc-sidebar.ts @@ -107,11 +107,31 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/Interface.GenerateDatabaseCredentialRequest", label: "GenerateDatabaseCredentialRequest" }, + { + type: "doc", + id: "api/appkit/Interface.IJobsConfig", + label: "IJobsConfig" + }, { type: "doc", id: "api/appkit/Interface.ITelemetry", label: "ITelemetry" }, + { + type: "doc", + id: "api/appkit/Interface.JobAPI", + label: "JobAPI" + }, + { + type: "doc", + id: "api/appkit/Interface.JobConfig", + label: "JobConfig" + }, + { + type: "doc", + id: "api/appkit/Interface.JobsConnectorConfig", + label: "JobsConnectorConfig" + }, { type: "doc", id: "api/appkit/Interface.LakebasePoolConfig", @@ -193,6 +213,16 @@ const typedocSidebar: SidebarsConfig = { id: "api/appkit/TypeAlias.IAppRouter", label: "IAppRouter" }, + { + type: "doc", + id: "api/appkit/TypeAlias.JobHandle", + label: "JobHandle" + }, + { + type: "doc", + id: "api/appkit/TypeAlias.JobsExport", + label: "JobsExport" + }, { type: "doc", id: "api/appkit/TypeAlias.PluginData", diff --git a/docs/docs/plugins/caching.md b/docs/docs/plugins/caching.md index d064393b..3bd7fedd 100644 --- a/docs/docs/plugins/caching.md +++ b/docs/docs/plugins/caching.md @@ -1,5 +1,5 @@ --- -sidebar_position: 8 +sidebar_position: 9 --- # Caching diff --git a/docs/docs/plugins/index.md b/docs/docs/plugins/index.md index f0e4b51d..ba0e17ce 100644 --- a/docs/docs/plugins/index.md +++ b/docs/docs/plugins/index.md @@ -13,7 +13,7 @@ For complete API documentation, see the [`Plugin`](../api/appkit/Class.Plugin.md Configure plugins when creating your AppKit instance: ```typescript -import { createApp, server, analytics, genie, files } from "@databricks/appkit"; +import { createApp, server, analytics, genie, files, jobs } from "@databricks/appkit"; const AppKit = await createApp({ plugins: [ @@ -21,6 +21,7 @@ const AppKit = await createApp({ analytics(), genie(), files(), + jobs(), ], }); ``` diff --git a/docs/docs/plugins/jobs.md b/docs/docs/plugins/jobs.md new file mode 100644 index 00000000..7aad6fb7 --- /dev/null +++ b/docs/docs/plugins/jobs.md @@ -0,0 +1,237 @@ +--- +sidebar_position: 8 +--- + +# Jobs plugin + +Trigger and monitor [Databricks Lakeflow Jobs](https://docs.databricks.com/en/jobs/index.html) from your AppKit application. + +**Key features:** +- Multi-job support with named job keys +- Auto-discovery of jobs from environment variables +- Run-and-wait with SSE streaming status updates +- Parameter validation with Zod schemas +- Task-type-aware parameter mapping (notebook, python_wheel, sql, etc.) +- On-behalf-of (OBO) user execution + +## Basic usage + +```ts +import { createApp, server, jobs } from "@databricks/appkit"; + +await createApp({ + plugins: [server(), jobs()], +}); +``` + +With no explicit `jobs` config, the plugin reads `DATABRICKS_JOB_ID` from the environment and registers it under the `default` key. + +## Configuration options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `timeout` | `number` | `60000` | Default timeout for Jobs API calls in ms | +| `pollIntervalMs` | `number` | `5000` | Poll interval for `runAndWait` in ms | +| `jobs` | `Record` | — | Named jobs to expose. Each key becomes a job accessor | + +### Per-job config (`JobConfig`) + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `timeout` | `number` | `600000` | Override the polling timeout for this job | +| `taskType` | `TaskType` | — | Task type for automatic parameter mapping | +| `params` | `z.ZodType` | — | Zod schema for runtime parameter validation | + +## Environment variables + +### Single-job mode + +Set `DATABRICKS_JOB_ID` to expose one job under the `default` key: + +```env +DATABRICKS_JOB_ID=123456 +``` + +```ts +const handle = AppKit.jobs("default"); +``` + +### Multi-job mode + +Set `DATABRICKS_JOB_` for each job: + +```env +DATABRICKS_JOB_ETL=123456 +DATABRICKS_JOB_ML=789012 +``` + +```ts +const etl = AppKit.jobs("etl"); +const ml = AppKit.jobs("ml"); +``` + +Environment variable names are uppercased; job keys are lowercased. Jobs discovered from the environment are merged with any explicit `jobs` config — explicit config wins. + +## Parameter validation + +Use `params` to enforce a Zod schema at runtime. Invalid parameters are rejected with a `400` before the job is triggered: + +```ts +import { z } from "zod"; + +jobs({ + jobs: { + etl: { + params: z.object({ + startDate: z.string(), + endDate: z.string(), + dryRun: z.boolean().optional(), + }), + }, + }, +}) +``` + +## Task type mapping + +When `taskType` is set, the plugin maps validated parameters to the correct SDK request fields automatically: + +| Task type | SDK field | Parameter shape | +|-----------|-----------|-----------------| +| `notebook` | `notebook_params` | `Record` — values coerced to string | +| `python_wheel` | `python_named_params` | `Record` — values coerced to string | +| `python_script` | `python_params` | `{ args: string[] }` — positional args | +| `spark_jar` | `jar_params` | `{ args: string[] }` — positional args | +| `sql` | `sql_params` | `Record` — values coerced to string | +| `dbt` | — | No parameters accepted | + +```ts +jobs({ + jobs: { + etl: { + taskType: "notebook", + params: z.object({ + startDate: z.string(), + endDate: z.string(), + }), + }, + }, +}) +``` + +When `taskType` is omitted, parameters are passed through to the SDK as-is. + +## Execution context + +All HTTP routes execute on behalf of the authenticated user (OBO). For programmatic access via `exports()`, use `.asUser(req)` to run in user context: + +```ts +// Service principal context (default) +const result = await AppKit.jobs("etl").runNow({ startDate: "2025-01-01" }); + +// User context (recommended in route handlers) +const result = await AppKit.jobs("etl").asUser(req).runNow({ startDate: "2025-01-01" }); +``` + +## HTTP endpoints + +All routes are mounted under `/api/jobs`. + +### Trigger a run + +``` +POST /api/jobs/:jobKey/run +Content-Type: application/json + +{ "params": { "startDate": "2025-01-01" } } +``` + +Returns `{ "runId": 12345 }`. + +Add `?stream=true` to receive SSE status updates that poll until the run completes: + +``` +POST /api/jobs/:jobKey/run?stream=true +``` + +Each SSE event contains `{ status, timestamp, run }`. + +### List runs + +``` +GET /api/jobs/:jobKey/runs?limit=20 +``` + +Returns `{ "runs": [...] }`. Limit is clamped to 1–100, default 20. + +### Get run details + +``` +GET /api/jobs/:jobKey/runs/:runId +``` + +### Get latest status + +``` +GET /api/jobs/:jobKey/status +``` + +Returns `{ "status": "TERMINATED", "run": { ... } }` for the most recent run. + +### Cancel a run + +``` +DELETE /api/jobs/:jobKey/runs/:runId +``` + +Returns `204 No Content` on success. + +## Programmatic access + +The plugin exports a callable that selects a job by key: + +```ts +const AppKit = await createApp({ + plugins: [ + server(), + jobs({ + jobs: { + etl: { taskType: "notebook" }, + }, + }), + ], +}); + +const etl = AppKit.jobs("etl"); + +// Trigger a run +const result = await etl.runNow({ startDate: "2025-01-01" }); +if (result.ok) { + console.log("Run ID:", result.data.run_id); +} + +// Trigger and poll until completion +for await (const status of etl.runAndWait({ startDate: "2025-01-01" })) { + console.log(status.status); // "PENDING", "RUNNING", "TERMINATED", etc. +} + +// Read operations +await etl.lastRun(); +await etl.listRuns({ limit: 10 }); +await etl.getRun(12345); +await etl.getRunOutput(12345); +await etl.getJob(); + +// Cancel +await etl.cancelRun(12345); +``` + +All methods return [`ExecutionResult`](../api/appkit/TypeAlias.ExecutionResult.md) — check `result.ok` before accessing `result.data`. + +## Execution defaults + +| Tier | Cache | Retry | Timeout | Methods | +|------|-------|-------|---------|---------| +| Read | 60s TTL | 3 attempts, 1s backoff | 30s | `getRun`, `getJob`, `listRuns`, `lastRun`, `getRunOutput` | +| Write | Disabled | Disabled | 120s | `runNow`, `cancelRun` | +| Stream | Disabled | Disabled | 600s | `runAndWait` (SSE polling) | diff --git a/packages/appkit/package.json b/packages/appkit/package.json index 64166c4c..ad2afdd1 100644 --- a/packages/appkit/package.json +++ b/packages/appkit/package.json @@ -77,7 +77,8 @@ "semver": "7.7.3", "shared": "workspace:*", "vite": "npm:rolldown-vite@7.1.14", - "ws": "8.18.3" + "ws": "8.18.3", + "zod": "^4.3.6" }, "devDependencies": { "@types/express": "4.17.25", diff --git a/packages/appkit/src/connectors/index.ts b/packages/appkit/src/connectors/index.ts index 41e7748c..60e1728a 100644 --- a/packages/appkit/src/connectors/index.ts +++ b/packages/appkit/src/connectors/index.ts @@ -1,5 +1,6 @@ export * from "./files"; export * from "./genie"; +export * from "./jobs"; export * from "./lakebase"; export * from "./lakebase-v1"; export * from "./sql-warehouse"; diff --git a/packages/appkit/src/connectors/jobs/client.ts b/packages/appkit/src/connectors/jobs/client.ts new file mode 100644 index 00000000..0eb2dcb6 --- /dev/null +++ b/packages/appkit/src/connectors/jobs/client.ts @@ -0,0 +1,212 @@ +import { + Context, + type jobs, + type WorkspaceClient, +} from "@databricks/sdk-experimental"; +import { AppKitError, ExecutionError } from "../../errors"; +import { createLogger } from "../../logging/logger"; +import type { TelemetryProvider } from "../../telemetry"; +import { + type Counter, + type Histogram, + type Span, + SpanKind, + SpanStatusCode, + TelemetryManager, +} from "../../telemetry"; +import type { JobsConnectorConfig } from "./types"; + +const logger = createLogger("connectors:jobs"); + +export class JobsConnector { + private readonly name = "jobs"; + private readonly config: JobsConnectorConfig; + private readonly telemetry: TelemetryProvider; + private readonly telemetryMetrics: { + apiCallCount: Counter; + apiCallDuration: Histogram; + }; + + constructor(config: JobsConnectorConfig) { + this.config = config; + this.telemetry = TelemetryManager.getProvider( + this.name, + this.config.telemetry, + ); + this.telemetryMetrics = { + apiCallCount: this.telemetry + .getMeter() + .createCounter("jobs.api_call.count", { + description: "Total number of Jobs API calls", + unit: "1", + }), + apiCallDuration: this.telemetry + .getMeter() + .createHistogram("jobs.api_call.duration", { + description: "Duration of Jobs API calls", + unit: "ms", + }), + }; + } + + async submitRun( + workspaceClient: WorkspaceClient, + request: jobs.SubmitRun, + signal?: AbortSignal, + ): Promise { + return this._callApi("submit", async () => { + return workspaceClient.jobs.submit(request, this._createContext(signal)); + }); + } + + async runNow( + workspaceClient: WorkspaceClient, + request: jobs.RunNow, + signal?: AbortSignal, + ): Promise { + return this._callApi("runNow", async () => { + return workspaceClient.jobs.runNow(request, this._createContext(signal)); + }); + } + + async getRun( + workspaceClient: WorkspaceClient, + request: jobs.GetRunRequest, + signal?: AbortSignal, + ): Promise { + return this._callApi("getRun", async () => { + return workspaceClient.jobs.getRun(request, this._createContext(signal)); + }); + } + + async getRunOutput( + workspaceClient: WorkspaceClient, + request: jobs.GetRunOutputRequest, + signal?: AbortSignal, + ): Promise { + return this._callApi("getRunOutput", async () => { + return workspaceClient.jobs.getRunOutput( + request, + this._createContext(signal), + ); + }); + } + + async cancelRun( + workspaceClient: WorkspaceClient, + request: jobs.CancelRun, + signal?: AbortSignal, + ): Promise { + await this._callApi("cancelRun", async () => { + return workspaceClient.jobs.cancelRun( + request, + this._createContext(signal), + ); + }); + } + + async listRuns( + workspaceClient: WorkspaceClient, + request: jobs.ListRunsRequest, + signal?: AbortSignal, + ): Promise { + return this._callApi("listRuns", async () => { + const runs: jobs.BaseRun[] = []; + const limit = Math.max(1, Math.min(request.limit ?? 100, 100)); + for await (const run of workspaceClient.jobs.listRuns( + request, + this._createContext(signal), + )) { + runs.push(run); + if (runs.length >= limit) break; + } + return runs; + }); + } + + async getJob( + workspaceClient: WorkspaceClient, + request: jobs.GetJobRequest, + signal?: AbortSignal, + ): Promise { + return this._callApi("getJob", async () => { + return workspaceClient.jobs.get(request, this._createContext(signal)); + }); + } + + private async _callApi( + operation: string, + fn: () => Promise, + ): Promise { + const startTime = Date.now(); + let success = false; + + return this.telemetry.startActiveSpan( + `jobs.${operation}`, + { + kind: SpanKind.CLIENT, + attributes: { + "jobs.operation": operation, + }, + }, + async (span: Span) => { + try { + const result = await fn(); + success = true; + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + span.recordException(error as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + if (error instanceof AppKitError) { + throw error; + } + // Preserve SDK ApiError (and any error with a numeric statusCode) + // so Plugin.execute() can map it to the correct HTTP status. + if ( + error instanceof Error && + "statusCode" in error && + typeof (error as Record).statusCode === "number" + ) { + throw error; + } + throw ExecutionError.statementFailed( + error instanceof Error ? error.message : String(error), + ); + } finally { + const duration = Date.now() - startTime; + span.end(); + this.telemetryMetrics.apiCallCount.add(1, { + operation, + success: success.toString(), + }); + this.telemetryMetrics.apiCallDuration.record(duration, { + operation, + success: success.toString(), + }); + + logger.event()?.setContext("jobs", { + operation, + duration_ms: duration, + success, + }); + } + }, + { name: this.name, includePrefix: true }, + ); + } + + private _createContext(signal?: AbortSignal) { + return new Context({ + cancellationToken: { + isCancellationRequested: signal?.aborted ?? false, + onCancellationRequested: (cb: () => void) => { + signal?.addEventListener("abort", cb, { once: true }); + }, + }, + }); + } +} diff --git a/packages/appkit/src/connectors/jobs/index.ts b/packages/appkit/src/connectors/jobs/index.ts new file mode 100644 index 00000000..efb4753a --- /dev/null +++ b/packages/appkit/src/connectors/jobs/index.ts @@ -0,0 +1,2 @@ +export { JobsConnector } from "./client"; +export type { JobsConnectorConfig } from "./types"; diff --git a/packages/appkit/src/connectors/jobs/types.ts b/packages/appkit/src/connectors/jobs/types.ts new file mode 100644 index 00000000..dd7dfbbe --- /dev/null +++ b/packages/appkit/src/connectors/jobs/types.ts @@ -0,0 +1,5 @@ +import type { TelemetryOptions } from "shared"; + +export interface JobsConnectorConfig { + telemetry?: TelemetryOptions; +} diff --git a/packages/appkit/src/index.ts b/packages/appkit/src/index.ts index 955bfde6..311af6aa 100644 --- a/packages/appkit/src/index.ts +++ b/packages/appkit/src/index.ts @@ -15,6 +15,7 @@ export type { } from "shared"; export { isSQLTypeMarker, sql } from "shared"; export { CacheManager } from "./cache"; +export type { JobsConnectorConfig } from "./connectors/jobs"; export type { DatabaseCredential, GenerateDatabaseCredentialRequest, @@ -53,7 +54,22 @@ export { type ToPlugin, toPlugin, } from "./plugin"; -export { analytics, files, genie, lakebase, server, serving } from "./plugins"; +export { + analytics, + files, + genie, + jobs, + lakebase, + server, + serving, +} from "./plugins"; +export type { + IJobsConfig, + JobAPI, + JobConfig, + JobHandle, + JobsExport, +} from "./plugins/jobs"; export type { EndpointConfig, ServingEndpointEntry, diff --git a/packages/appkit/src/plugins/index.ts b/packages/appkit/src/plugins/index.ts index 4d58082f..e2dd7b5a 100644 --- a/packages/appkit/src/plugins/index.ts +++ b/packages/appkit/src/plugins/index.ts @@ -1,6 +1,7 @@ export * from "./analytics"; export * from "./files"; export * from "./genie"; +export * from "./jobs"; export * from "./lakebase"; export * from "./server"; export * from "./serving"; diff --git a/packages/appkit/src/plugins/jobs/defaults.ts b/packages/appkit/src/plugins/jobs/defaults.ts new file mode 100644 index 00000000..477d180e --- /dev/null +++ b/packages/appkit/src/plugins/jobs/defaults.ts @@ -0,0 +1,37 @@ +import type { PluginExecuteConfig } from "shared"; + +/** + * Execution defaults for read-tier operations (getRun, getJob, listRuns, lastRun, getRunOutput). + * Cache 60s (ttl in seconds) + * Retry 3x with 1s backoff + * Timeout 30s + */ +export const JOBS_READ_DEFAULTS: PluginExecuteConfig = { + cache: { enabled: true, ttl: 60 }, + retry: { enabled: true, initialDelay: 1000, attempts: 3 }, + timeout: 30_000, +}; + +/** + * Execution defaults for write-tier operations (runNow, cancelRun). + * No cache + * No retry + * Timeout 120s + */ +export const JOBS_WRITE_DEFAULTS: PluginExecuteConfig = { + cache: { enabled: false }, + retry: { enabled: false }, + timeout: 120_000, +}; + +/** + * Execution defaults for stream-tier operations (runNowAndWait with polling). + * No cache + * No retry + * Timeout 600s + */ +export const JOBS_STREAM_DEFAULTS: PluginExecuteConfig = { + cache: { enabled: false }, + retry: { enabled: false }, + timeout: 600_000, +}; diff --git a/packages/appkit/src/plugins/jobs/index.ts b/packages/appkit/src/plugins/jobs/index.ts new file mode 100644 index 00000000..9567342c --- /dev/null +++ b/packages/appkit/src/plugins/jobs/index.ts @@ -0,0 +1,8 @@ +export { jobs } from "./plugin"; +export type { + IJobsConfig, + JobAPI, + JobConfig, + JobHandle, + JobsExport, +} from "./types"; diff --git a/packages/appkit/src/plugins/jobs/manifest.json b/packages/appkit/src/plugins/jobs/manifest.json new file mode 100644 index 00000000..d17985aa --- /dev/null +++ b/packages/appkit/src/plugins/jobs/manifest.json @@ -0,0 +1,41 @@ +{ + "$schema": "https://databricks.github.io/appkit/schemas/plugin-manifest.schema.json", + "name": "jobs", + "displayName": "Jobs Plugin", + "description": "Trigger and monitor Databricks Lakeflow Jobs. Supports single-job mode (DATABRICKS_JOB_ID) or multi-job mode (DATABRICKS_JOB_{NAME} per job, e.g. DATABRICKS_JOB_ETL, DATABRICKS_JOB_ML).", + "resources": { + "required": [ + { + "type": "job", + "alias": "Job", + "resourceKey": "job", + "description": "A Databricks job to trigger and monitor", + "permission": "CAN_MANAGE_RUN", + "fields": { + "id": { + "env": "DATABRICKS_JOB_ID", + "description": "Job ID (numeric). Use DATABRICKS_JOB_ID for single-job mode, or DATABRICKS_JOB_{NAME} for each named job. Obtain from the Jobs UI or `databricks jobs list`." + } + } + } + ], + "optional": [] + }, + "config": { + "schema": { + "type": "object", + "properties": { + "timeout": { + "type": "number", + "default": 60000, + "description": "Default timeout for Jobs API calls in milliseconds" + }, + "pollIntervalMs": { + "type": "number", + "default": 5000, + "description": "Poll interval for waiting on run completion in milliseconds" + } + } + } + } +} diff --git a/packages/appkit/src/plugins/jobs/params.ts b/packages/appkit/src/plugins/jobs/params.ts new file mode 100644 index 00000000..a0f1e8f0 --- /dev/null +++ b/packages/appkit/src/plugins/jobs/params.ts @@ -0,0 +1,53 @@ +import type { TaskType } from "./types"; + +/** + * Maps validated parameters to SDK request fields based on the task type. + * This is a pure function — stateless and testable in isolation. + */ +export function mapParams( + taskType: TaskType, + params: Record, +): Record { + switch (taskType) { + case "notebook": + // notebook_params expects Record, values coerced to string + return { + notebook_params: Object.fromEntries( + Object.entries(params).map(([k, v]) => [k, String(v)]), + ), + }; + case "python_wheel": + return { + python_named_params: Object.fromEntries( + Object.entries(params).map(([k, v]) => [k, String(v)]), + ), + }; + case "python_script": + // python_params expects string[] (positional args) + return { + python_params: Array.isArray(params.args) + ? params.args.map(String) + : [], + }; + case "spark_jar": + // jar_params expects string[] + return { + jar_params: Array.isArray(params.args) ? params.args.map(String) : [], + }; + case "sql": + return { + sql_params: Object.fromEntries( + Object.entries(params).map(([k, v]) => [k, String(v)]), + ), + }; + case "dbt": + if (Object.keys(params).length > 0) { + throw new Error("dbt tasks do not accept parameters"); + } + return {}; + default: { + const _exhaustive: never = taskType; + throw new Error(`Unknown task type: ${_exhaustive}`); + } + } +} diff --git a/packages/appkit/src/plugins/jobs/plugin.ts b/packages/appkit/src/plugins/jobs/plugin.ts new file mode 100644 index 00000000..fa5a9335 --- /dev/null +++ b/packages/appkit/src/plugins/jobs/plugin.ts @@ -0,0 +1,739 @@ +import { STATUS_CODES } from "node:http"; +import type { jobs as jobsTypes } from "@databricks/sdk-experimental"; +import type express from "express"; +import type { + IAppRequest, + IAppRouter, + PluginExecutionSettings, + StreamExecutionSettings, +} from "shared"; +import { toJSONSchema } from "zod"; +import { JobsConnector } from "../../connectors/jobs"; +import { getCurrentUserId, getWorkspaceClient } from "../../context"; +import { + ExecutionError, + InitializationError, + ValidationError, +} from "../../errors"; +import { createLogger } from "../../logging/logger"; +import type { ExecutionResult } from "../../plugin"; +import { Plugin, toPlugin } from "../../plugin"; +import type { PluginManifest, ResourceRequirement } from "../../registry"; +import { ResourceType } from "../../registry"; +import { + JOBS_READ_DEFAULTS, + JOBS_STREAM_DEFAULTS, + JOBS_WRITE_DEFAULTS, +} from "./defaults"; +import manifest from "./manifest.json"; +import { mapParams } from "./params"; +import type { + IJobsConfig, + JobAPI, + JobConfig, + JobHandle, + JobRunStatus, + JobsExport, +} from "./types"; + +const logger = createLogger("jobs"); + +const DEFAULT_WAIT_TIMEOUT = 600_000; +const DEFAULT_POLL_INTERVAL = 5_000; + +/** Replace upstream error messages with generic descriptions keyed by HTTP status. */ +function errorResult(status: number): ExecutionResult { + return { + ok: false, + status, + message: STATUS_CODES[status] ?? "Request failed", + }; +} + +class JobsPlugin extends Plugin { + static manifest = manifest as PluginManifest; + + protected declare config: IJobsConfig; + private connector: JobsConnector; + private jobIds: Record = {}; + private jobConfigs: Record = {}; + private jobKeys: string[] = []; + + /** + * Scans process.env for DATABRICKS_JOB_* keys and merges with explicit config. + * Explicit config wins for per-job overrides; auto-discovered jobs get default `{}` config. + */ + static discoverJobs(config: IJobsConfig): Record { + const explicit = config.jobs ?? {}; + const discovered: Record = {}; + + const prefix = "DATABRICKS_JOB_"; + for (const key of Object.keys(process.env)) { + if (!key.startsWith(prefix)) continue; + if (key === "DATABRICKS_JOB_ID") continue; + const suffix = key.slice(prefix.length); + if (!suffix || !process.env[key]) continue; + const jobKey = suffix.toLowerCase(); + if (!(jobKey in explicit)) { + discovered[jobKey] = {}; + } + } + + // Single-job shorthand: DATABRICKS_JOB_ID maps to "default" key + if ( + process.env.DATABRICKS_JOB_ID && + Object.keys(explicit).length === 0 && + Object.keys(discovered).length === 0 + ) { + discovered.default = {}; + } + + return { ...discovered, ...explicit }; + } + + /** + * Generates resource requirements dynamically from discovered + configured jobs. + * Each job key maps to a `DATABRICKS_JOB_{KEY_UPPERCASE}` env var (or `DATABRICKS_JOB_ID` for "default"). + */ + static getResourceRequirements(config: IJobsConfig): ResourceRequirement[] { + const jobs = JobsPlugin.discoverJobs(config); + return Object.keys(jobs).map((key) => ({ + type: ResourceType.JOB, + alias: `job-${key}`, + resourceKey: `job-${key}`, + description: `Databricks Job "${key}"`, + permission: "CAN_MANAGE_RUN" as const, + fields: { + id: { + env: + key === "default" + ? "DATABRICKS_JOB_ID" + : `DATABRICKS_JOB_${key.toUpperCase()}`, + description: `Job ID for "${key}"`, + }, + }, + required: true, + })); + } + + constructor(config: IJobsConfig) { + super(config); + this.config = config; + this.connector = new JobsConnector({ + telemetry: config.telemetry, + }); + + const jobs = JobsPlugin.discoverJobs(config); + this.jobKeys = Object.keys(jobs); + this.jobConfigs = jobs; + + for (const key of this.jobKeys) { + const envVar = + key === "default" + ? "DATABRICKS_JOB_ID" + : `DATABRICKS_JOB_${key.toUpperCase()}`; + const jobIdStr = process.env[envVar]; + if (jobIdStr) { + const parsed = Number.parseInt(jobIdStr, 10); + if (!Number.isNaN(parsed)) { + this.jobIds[key] = parsed; + } + } + } + } + + async setup() { + const client = getWorkspaceClient(); + if (!client) { + throw new InitializationError( + "Jobs plugin requires a configured workspace client", + ); + } + + if (this.jobKeys.length === 0) { + logger.warn( + "No jobs configured. Set DATABRICKS_JOB_ID or DATABRICKS_JOB_ env vars.", + ); + } + + for (const key of this.jobKeys) { + if (!this.jobIds[key]) { + logger.warn(`Job "${key}" has no valid job ID configured.`); + } + } + + logger.info( + `Jobs plugin initialized with ${this.jobKeys.length} job(s): ${this.jobKeys.join(", ")}`, + ); + } + + private get client() { + return getWorkspaceClient(); + } + + private getJobId(jobKey: string): number { + const id = this.jobIds[jobKey]; + if (!id) { + const envVar = + jobKey === "default" + ? "DATABRICKS_JOB_ID" + : `DATABRICKS_JOB_${jobKey.toUpperCase()}`; + throw new Error( + `Job "${jobKey}" has no configured job ID. Set ${envVar} env var.`, + ); + } + return id; + } + + private _readSettings( + cacheKey: (string | number | object)[], + ): PluginExecutionSettings { + return { + default: { + ...JOBS_READ_DEFAULTS, + ...(this.config.timeout != null && { timeout: this.config.timeout }), + cache: { ...JOBS_READ_DEFAULTS.cache, cacheKey }, + }, + }; + } + + private _writeSettings(): PluginExecutionSettings { + return { + default: { + ...JOBS_WRITE_DEFAULTS, + ...(this.config.timeout != null && { timeout: this.config.timeout }), + }, + }; + } + + /** + * Creates a JobAPI for a specific configured job key. + * Each method is scoped to the job's configured ID. + */ + protected createJobAPI(jobKey: string): JobAPI { + const jobId = this.getJobId(jobKey); + const jobConfig = this.jobConfigs[jobKey]; + // Capture `this` for use in the async generator + const self = this; + // Eagerly capture the client and userId so that when createJobAPI is + // called inside an asUser() proxy (which runs in user context), the + // closures below use the user-scoped client instead of falling back + // to the service principal when the ALS context has already exited. + const client = this.client; + const userKey = getCurrentUserId(); + + return { + runNow: async ( + params?: Record, + ): Promise> => { + // Validate if schema exists + let validatedParams = params; + if (jobConfig?.params) { + const result = jobConfig.params.safeParse(params ?? {}); + if (!result.success) { + throw new ValidationError( + `Parameter validation failed for job "${jobKey}": ${result.error.message}`, + ); + } + validatedParams = result.data as Record; + } + + // Map params to SDK fields + const sdkFields = + jobConfig?.taskType && validatedParams + ? mapParams(jobConfig.taskType, validatedParams) + : (validatedParams ?? {}); + + const result = await self.execute( + async (signal) => + self.connector.runNow( + client, + { + ...sdkFields, + job_id: jobId, + }, + signal, + ), + self._writeSettings(), + userKey, + ); + return result.ok ? result : errorResult(result.status); + }, + + async *runAndWait( + params?: Record, + signal?: AbortSignal, + ): AsyncGenerator { + // Validate if schema exists + let validatedParams = params; + if (jobConfig?.params) { + const result = jobConfig.params.safeParse(params ?? {}); + if (!result.success) { + throw new ValidationError( + `Parameter validation failed for job "${jobKey}": ${result.error.message}`, + ); + } + validatedParams = result.data as Record; + } + + // Map params to SDK fields + const sdkFields = + jobConfig?.taskType && validatedParams + ? mapParams(jobConfig.taskType, validatedParams) + : (validatedParams ?? {}); + + const runResult = await self.execute( + async (signal) => + self.connector.runNow( + client, + { + ...sdkFields, + job_id: jobId, + }, + signal, + ), + self._writeSettings(), + userKey, + ); + + if (!runResult.ok) { + throw new ExecutionError("Failed to trigger job run"); + } + const runId = runResult.data.run_id; + if (!runId) { + throw new Error("runNow did not return a run_id"); + } + + const pollInterval = + self.config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL; + const timeout = jobConfig?.waitTimeout ?? DEFAULT_WAIT_TIMEOUT; + const startTime = Date.now(); + + while (true) { + if (signal?.aborted) return; + + if (Date.now() - startTime > timeout) { + throw new Error( + `Job run ${runId} polling timeout after ${timeout}ms`, + ); + } + + const runStatusResult = await self.execute( + async (signal) => + self.connector.getRun(client, { run_id: runId }, signal), + { + default: { + ...JOBS_READ_DEFAULTS, + cache: { enabled: false }, + }, + }, + userKey, + ); + if (!runStatusResult.ok) { + throw new ExecutionError( + `Failed to poll run status for run ${runId}`, + ); + } + const run = runStatusResult.data; + const state = run.state?.life_cycle_state; + + yield { status: state, timestamp: Date.now(), run }; + + if ( + state === "TERMINATED" || + state === "SKIPPED" || + state === "INTERNAL_ERROR" + ) { + return; + } + + await new Promise((resolve) => { + if (signal?.aborted) { + resolve(); + return; + } + const timer = setTimeout(resolve, pollInterval); + signal?.addEventListener( + "abort", + () => { + clearTimeout(timer); + resolve(); + }, + { once: true }, + ); + }); + if (signal?.aborted) return; + } + }, + + lastRun: async (): Promise< + ExecutionResult + > => { + const result = await self.execute( + async (signal) => + self.connector.listRuns( + client, + { job_id: jobId, limit: 1 }, + signal, + ), + self._readSettings(["jobs:lastRun", jobKey]), + userKey, + ); + if (!result.ok) return errorResult(result.status); + return { ok: true, data: result.data[0] }; + }, + + listRuns: async (options?: { + limit?: number; + }): Promise> => { + const result = await self.execute( + async (signal) => + self.connector.listRuns( + client, + { job_id: jobId, limit: options?.limit }, + signal, + ), + self._readSettings(["jobs:listRuns", jobKey, options ?? {}]), + userKey, + ); + return result.ok ? result : errorResult(result.status); + }, + + getRun: async ( + runId: number, + ): Promise> => { + const result = await self.execute( + async (signal) => + self.connector.getRun(client, { run_id: runId }, signal), + self._readSettings(["jobs:getRun", jobKey, runId]), + userKey, + ); + return result.ok ? result : errorResult(result.status); + }, + + getRunOutput: async ( + runId: number, + ): Promise> => { + const result = await self.execute( + async (signal) => + self.connector.getRunOutput(client, { run_id: runId }, signal), + self._readSettings(["jobs:getRunOutput", jobKey, runId]), + userKey, + ); + return result.ok ? result : errorResult(result.status); + }, + + cancelRun: async (runId: number): Promise> => { + const result = await self.execute( + async (signal) => + self.connector.cancelRun(client, { run_id: runId }, signal), + self._writeSettings(), + userKey, + ); + return result.ok ? result : errorResult(result.status); + }, + + getJob: async (): Promise> => { + const result = await self.execute( + async (signal) => + self.connector.getJob(client, { job_id: jobId }, signal), + self._readSettings(["jobs:getJob", jobKey]), + userKey, + ); + return result.ok ? result : errorResult(result.status); + }, + }; + } + + /** + * Resolve `:jobKey` from the request. Returns the key and ID, + * or sends a 404 and returns `{ jobKey: undefined, jobId: undefined }`. + */ + private _resolveJob( + req: express.Request, + res: express.Response, + ): + | { jobKey: string; jobId: number } + | { jobKey: undefined; jobId: undefined } { + const jobKey = req.params.jobKey; + if (!this.jobKeys.includes(jobKey)) { + const safeKey = jobKey.replace(/[^a-zA-Z0-9_-]/g, ""); + res.status(404).json({ + error: `Unknown job "${safeKey}"`, + plugin: this.name, + }); + return { jobKey: undefined, jobId: undefined }; + } + const jobId = this.jobIds[jobKey]; + if (!jobId) { + res.status(404).json({ + error: `Job "${jobKey}" has no configured job ID`, + plugin: this.name, + }); + return { jobKey: undefined, jobId: undefined }; + } + return { jobKey, jobId }; + } + + private _sendStatusError(res: express.Response, status: number): void { + res.status(status).json({ + error: STATUS_CODES[status] ?? "Unknown Error", + plugin: this.name, + }); + } + + injectRoutes(router: IAppRouter) { + // POST /:jobKey/run + this.route(router, { + name: "run", + method: "post", + path: "/:jobKey/run", + handler: async (req: express.Request, res: express.Response) => { + const { jobKey } = this._resolveJob(req, res); + if (!jobKey) return; + + const rawParams = req.body?.params; + if ( + rawParams !== undefined && + (typeof rawParams !== "object" || + rawParams === null || + Array.isArray(rawParams)) + ) { + res.status(400).json({ + error: "params must be a plain object", + plugin: this.name, + }); + return; + } + const params = rawParams as Record | undefined; + const stream = req.query.stream === "true"; + + // Validate params eagerly so streaming requests get a clean 400 + // instead of a generic SSE error event. + const jobConfig = this.jobConfigs[jobKey]; + if (jobConfig?.params) { + const result = jobConfig.params.safeParse(params ?? {}); + if (!result.success) { + res.status(400).json({ + error: `Parameter validation failed for job "${jobKey}": ${result.error.message}`, + plugin: this.name, + }); + return; + } + } + + try { + const userPlugin = this.asUser(req) as JobsPlugin; + const api = userPlugin.createJobAPI(jobKey); + + if (stream) { + const streamSettings: StreamExecutionSettings = { + default: JOBS_STREAM_DEFAULTS, + }; + await this.executeStream( + res, + (signal) => api.runAndWait(params, signal), + streamSettings, + ); + } else { + const result = await api.runNow(params); + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.json({ runId: result.data.run_id }); + } + } catch (error) { + if (error instanceof ValidationError) { + if (!res.headersSent) { + res.status(400).json({ error: error.message, plugin: this.name }); + } + return; + } + logger.error("Run failed for job %s: %O", jobKey, error); + if (!res.headersSent) { + res.status(500).json({ error: "Run failed", plugin: this.name }); + } + } + }, + }); + + // GET /:jobKey/runs + this.route(router, { + name: "runs", + method: "get", + path: "/:jobKey/runs", + handler: async (req: express.Request, res: express.Response) => { + const { jobKey } = this._resolveJob(req, res); + if (!jobKey) return; + + const limit = Math.max( + 1, + Math.min(Number.parseInt(req.query.limit as string, 10) || 20, 100), + ); + + try { + const userPlugin = this.asUser(req) as JobsPlugin; + const api = userPlugin.createJobAPI(jobKey); + const result = await api.listRuns({ limit }); + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.json({ runs: result.data }); + } catch (error) { + logger.error("List runs failed for job %s: %O", jobKey, error); + res + .status(500) + .json({ error: "List runs failed", plugin: this.name }); + } + }, + }); + + // GET /:jobKey/runs/:runId + this.route(router, { + name: "run-detail", + method: "get", + path: "/:jobKey/runs/:runId", + handler: async (req: express.Request, res: express.Response) => { + const { jobKey } = this._resolveJob(req, res); + if (!jobKey) return; + + const runId = Number.parseInt(req.params.runId, 10); + if (Number.isNaN(runId) || runId <= 0) { + res.status(400).json({ error: "Invalid runId", plugin: this.name }); + return; + } + + try { + const userPlugin = this.asUser(req) as JobsPlugin; + const api = userPlugin.createJobAPI(jobKey); + const result = await api.getRun(runId); + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.json(result.data); + } catch (error) { + logger.error( + "Get run failed for job %s run %d: %O", + jobKey, + runId, + error, + ); + res.status(500).json({ error: "Get run failed", plugin: this.name }); + } + }, + }); + + // GET /:jobKey/status + this.route(router, { + name: "status", + method: "get", + path: "/:jobKey/status", + handler: async (req: express.Request, res: express.Response) => { + const { jobKey } = this._resolveJob(req, res); + if (!jobKey) return; + + try { + const userPlugin = this.asUser(req) as JobsPlugin; + const api = userPlugin.createJobAPI(jobKey); + const result = await api.lastRun(); + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.json({ + status: result.data?.state?.life_cycle_state ?? null, + run: result.data ?? null, + }); + } catch (error) { + logger.error("Status check failed for job %s: %O", jobKey, error); + res + .status(500) + .json({ error: "Status check failed", plugin: this.name }); + } + }, + }); + + // DELETE /:jobKey/runs/:runId + this.route(router, { + name: "cancel-run", + method: "delete", + path: "/:jobKey/runs/:runId", + handler: async (req: express.Request, res: express.Response) => { + const { jobKey } = this._resolveJob(req, res); + if (!jobKey) return; + + const runId = Number.parseInt(req.params.runId, 10); + if (Number.isNaN(runId) || runId <= 0) { + res.status(400).json({ error: "Invalid runId", plugin: this.name }); + return; + } + + try { + const userPlugin = this.asUser(req) as JobsPlugin; + const api = userPlugin.createJobAPI(jobKey); + const result = await api.cancelRun(runId); + if (!result.ok) { + this._sendStatusError(res, result.status); + return; + } + res.status(204).end(); + } catch (error) { + logger.error( + "Cancel run failed for job %s run %d: %O", + jobKey, + runId, + error, + ); + res + .status(500) + .json({ error: "Cancel run failed", plugin: this.name }); + } + }, + }); + } + + exports(): JobsExport { + const resolveJob = (jobKey: string): JobHandle => { + if (!this.jobKeys.includes(jobKey)) { + throw new Error( + `Unknown job "${jobKey}". Available jobs: ${this.jobKeys.join(", ")}`, + ); + } + + const spApi = this.createJobAPI(jobKey); + + return { + ...spApi, + asUser: (req: IAppRequest) => { + const userPlugin = this.asUser(req) as JobsPlugin; + return userPlugin.createJobAPI(jobKey); + }, + }; + }; + + return ((jobKey: string) => resolveJob(jobKey)) as JobsExport; + } + + clientConfig(): Record { + const jobs: Record = + {}; + for (const key of this.jobKeys) { + const config = this.jobConfigs[key]; + jobs[key] = { + params: config?.params ? toJSONSchema(config.params) : null, + taskType: config?.taskType ?? null, + }; + } + return { jobs }; + } +} + +/** + * @internal + */ +export const jobs = toPlugin(JobsPlugin); + +export { JobsPlugin }; diff --git a/packages/appkit/src/plugins/jobs/tests/plugin.test.ts b/packages/appkit/src/plugins/jobs/tests/plugin.test.ts new file mode 100644 index 00000000..0b0b189b --- /dev/null +++ b/packages/appkit/src/plugins/jobs/tests/plugin.test.ts @@ -0,0 +1,1712 @@ +import { mockServiceContext, setupDatabricksEnv } from "@tools/test-helpers"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { z } from "zod"; +import { ServiceContext } from "../../../context/service-context"; +import { AuthenticationError } from "../../../errors"; +import { ResourceType } from "../../../registry"; +import { + JOBS_READ_DEFAULTS, + JOBS_STREAM_DEFAULTS, + JOBS_WRITE_DEFAULTS, +} from "../defaults"; +import { mapParams } from "../params"; +import { JobsPlugin, jobs } from "../plugin"; + +const { mockClient, mockCacheInstance } = vi.hoisted(() => { + const mockJobsApi = { + runNow: vi.fn(), + submit: vi.fn(), + getRun: vi.fn(), + getRunOutput: vi.fn(), + cancelRun: vi.fn(), + listRuns: vi.fn(), + get: vi.fn(), + }; + + const mockClient = { + jobs: mockJobsApi, + config: { + host: "https://test.databricks.com", + authenticate: vi.fn(), + }, + }; + + const mockCacheInstance = { + get: vi.fn(), + set: vi.fn(), + delete: vi.fn(), + getOrExecute: vi.fn(async (_key: unknown[], fn: () => Promise) => + fn(), + ), + generateKey: vi.fn(), + }; + + return { mockJobsApi, mockClient, mockCacheInstance }; +}); + +vi.mock("@databricks/sdk-experimental", () => ({ + WorkspaceClient: vi.fn(() => mockClient), + Context: vi.fn(), +})); + +vi.mock("../../../context", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getWorkspaceClient: vi.fn(() => mockClient), + isInUserContext: vi.fn(() => true), + }; +}); + +vi.mock("../../../cache", () => ({ + CacheManager: { + getInstanceSync: vi.fn(() => mockCacheInstance), + }, +})); + +describe("JobsPlugin", () => { + let serviceContextMock: Awaited>; + + beforeEach(async () => { + vi.clearAllMocks(); + setupDatabricksEnv(); + ServiceContext.reset(); + serviceContextMock = await mockServiceContext(); + }); + + afterEach(() => { + serviceContextMock?.restore(); + delete process.env.DATABRICKS_JOB_ID; + delete process.env.DATABRICKS_JOB_ETL; + delete process.env.DATABRICKS_JOB_ML; + delete process.env.DATABRICKS_JOB_; + delete process.env.DATABRICKS_JOB_EMPTY; + delete process.env.DATABRICKS_JOB_MY_PIPELINE; + }); + + test('plugin name is "jobs"', () => { + const pluginData = jobs({}); + expect(pluginData.name).toBe("jobs"); + }); + + test("plugin instance has correct name", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + const plugin = new JobsPlugin({}); + expect(plugin.name).toBe("jobs"); + }); + + describe("discoverJobs", () => { + test("discovers jobs from DATABRICKS_JOB_* env vars", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + process.env.DATABRICKS_JOB_ML = "456"; + + const jobs = JobsPlugin.discoverJobs({}); + expect(jobs).toHaveProperty("etl"); + expect(jobs).toHaveProperty("ml"); + expect(jobs.etl).toEqual({}); + expect(jobs.ml).toEqual({}); + }); + + test("single-job case: DATABRICKS_JOB_ID maps to 'default' key", () => { + process.env.DATABRICKS_JOB_ID = "789"; + + const jobs = JobsPlugin.discoverJobs({}); + expect(jobs).toHaveProperty("default"); + expect(Object.keys(jobs)).toEqual(["default"]); + }); + + test("DATABRICKS_JOB_ID is ignored when named jobs exist", () => { + process.env.DATABRICKS_JOB_ID = "789"; + process.env.DATABRICKS_JOB_ETL = "123"; + + const jobs = JobsPlugin.discoverJobs({}); + expect(jobs).not.toHaveProperty("default"); + expect(jobs).toHaveProperty("etl"); + }); + + test("DATABRICKS_JOB_ID is ignored when explicit config exists", () => { + process.env.DATABRICKS_JOB_ID = "789"; + + const jobs = JobsPlugin.discoverJobs({ + jobs: { pipeline: {} }, + }); + expect(jobs).not.toHaveProperty("default"); + expect(jobs).toHaveProperty("pipeline"); + }); + + test("merges with explicit config, explicit wins", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + process.env.DATABRICKS_JOB_ML = "456"; + + const jobs = JobsPlugin.discoverJobs({ + jobs: { + etl: { waitTimeout: 42 }, + }, + }); + expect(jobs.etl).toEqual({ waitTimeout: 42 }); + expect(jobs.ml).toEqual({}); + }); + + test("skips bare DATABRICKS_JOB_ prefix (no suffix)", () => { + process.env.DATABRICKS_JOB_ = "999"; + try { + const jobs = JobsPlugin.discoverJobs({}); + expect(Object.keys(jobs)).not.toContain(""); + } finally { + delete process.env.DATABRICKS_JOB_; + } + }); + + test("skips empty env var values", () => { + process.env.DATABRICKS_JOB_EMPTY = ""; + try { + const jobs = JobsPlugin.discoverJobs({}); + expect(jobs).not.toHaveProperty("empty"); + } finally { + delete process.env.DATABRICKS_JOB_EMPTY; + } + }); + + test("lowercases env var suffix", () => { + process.env.DATABRICKS_JOB_MY_PIPELINE = "111"; + try { + const jobs = JobsPlugin.discoverJobs({}); + expect(jobs).toHaveProperty("my_pipeline"); + } finally { + delete process.env.DATABRICKS_JOB_MY_PIPELINE; + } + }); + + test("returns only explicit jobs when no env vars match", () => { + const jobs = JobsPlugin.discoverJobs({ + jobs: { custom: { waitTimeout: 10 } }, + }); + expect(Object.keys(jobs)).toEqual(["custom"]); + }); + }); + + describe("getResourceRequirements", () => { + test("generates one resource per job key", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + process.env.DATABRICKS_JOB_ML = "456"; + + const requirements = JobsPlugin.getResourceRequirements({}); + expect(requirements).toHaveLength(2); + + const etlReq = requirements.find((r) => r.resourceKey === "job-etl"); + expect(etlReq).toBeDefined(); + expect(etlReq?.type).toBe(ResourceType.JOB); + expect(etlReq?.permission).toBe("CAN_MANAGE_RUN"); + expect(etlReq?.fields.id.env).toBe("DATABRICKS_JOB_ETL"); + expect(etlReq?.required).toBe(true); + + const mlReq = requirements.find((r) => r.resourceKey === "job-ml"); + expect(mlReq).toBeDefined(); + expect(mlReq?.fields.id.env).toBe("DATABRICKS_JOB_ML"); + }); + + test("single-job case uses DATABRICKS_JOB_ID env var", () => { + process.env.DATABRICKS_JOB_ID = "789"; + + const requirements = JobsPlugin.getResourceRequirements({}); + expect(requirements).toHaveLength(1); + expect(requirements[0].resourceKey).toBe("job-default"); + expect(requirements[0].fields.id.env).toBe("DATABRICKS_JOB_ID"); + }); + + test("returns empty array when no jobs configured and no env vars", () => { + const requirements = JobsPlugin.getResourceRequirements({ jobs: {} }); + expect(requirements).toHaveLength(0); + }); + + test("auto-discovers jobs from env vars with empty config", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + process.env.DATABRICKS_JOB_ML = "456"; + + const requirements = JobsPlugin.getResourceRequirements({}); + expect(requirements).toHaveLength(2); + expect(requirements.map((r) => r.resourceKey).sort()).toEqual([ + "job-etl", + "job-ml", + ]); + }); + }); + + describe("exports()", () => { + test("returns a callable function", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + + expect(typeof exported).toBe("function"); + }); + + test("returns job handle with asUser and direct JobAPI methods", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + + const handle = exported("etl"); + expect(typeof handle.asUser).toBe("function"); + expect(typeof handle.runNow).toBe("function"); + expect(typeof handle.runAndWait).toBe("function"); + expect(typeof handle.lastRun).toBe("function"); + expect(typeof handle.listRuns).toBe("function"); + expect(typeof handle.getRun).toBe("function"); + expect(typeof handle.getRunOutput).toBe("function"); + expect(typeof handle.cancelRun).toBe("function"); + expect(typeof handle.getJob).toBe("function"); + }); + + test("throws for unknown job key", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + + expect(() => exported("unknown")).toThrow(/Unknown job "unknown"/); + }); + + test("single-job default key is accessible", () => { + process.env.DATABRICKS_JOB_ID = "789"; + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + + expect(() => exported("default")).not.toThrow(); + const handle = exported("default"); + expect(typeof handle.runNow).toBe("function"); + }); + }); + + describe("runNow auto-fills job_id", () => { + test("runNow passes configured job_id to connector", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + const handle = exported("etl"); + + await handle.runNow(); + + expect(mockClient.jobs.runNow).toHaveBeenCalledWith( + expect.objectContaining({ job_id: 123 }), + expect.anything(), + ); + }); + + test("runNow merges user params with configured job_id (no taskType)", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + const handle = exported("etl"); + + await handle.runNow({ + notebook_params: { key: "value" }, + }); + + expect(mockClient.jobs.runNow).toHaveBeenCalledWith( + expect.objectContaining({ + job_id: 123, + notebook_params: { key: "value" }, + }), + expect.anything(), + ); + }); + }); + + describe("parameter validation (Phase 3)", () => { + test("runNow validates params against job config schema", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({ + jobs: { + etl: { + taskType: "notebook", + params: z.object({ key: z.string() }), + }, + }, + }); + const handle = plugin.exports()("etl"); + + await expect(handle.runNow({ key: 42 })).rejects.toThrow( + /Parameter validation failed for job "etl"/, + ); + }); + + test("runNow maps validated params to SDK fields when taskType is set", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + + const plugin = new JobsPlugin({ + jobs: { + etl: { + taskType: "notebook", + params: z.object({ key: z.string() }), + }, + }, + }); + const handle = plugin.exports()("etl"); + + await handle.runNow({ key: "value" }); + + expect(mockClient.jobs.runNow).toHaveBeenCalledWith( + expect.objectContaining({ + job_id: 123, + notebook_params: { key: "value" }, + }), + expect.anything(), + ); + }); + + test("runNow skips validation when no schema is configured", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + await expect(handle.runNow({ anything: "goes" })).resolves.not.toThrow(); + }); + }); + + describe("read operations use interceptors", () => { + test("getRun wraps call in execute", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.getRun.mockResolvedValue({ + run_id: 1, + state: { life_cycle_state: "TERMINATED" }, + }); + + const plugin = new JobsPlugin({}); + const executeSpy = vi.spyOn(plugin as any, "execute"); + const handle = plugin.exports()("etl"); + + await handle.getRun(1); + + expect(executeSpy).toHaveBeenCalledWith( + expect.any(Function), + expect.objectContaining({ + default: expect.objectContaining({ + cache: expect.objectContaining({ + cacheKey: ["jobs:getRun", "etl", 1], + }), + }), + }), + expect.any(String), + ); + }); + + test("getJob wraps call in execute", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.get.mockResolvedValue({ job_id: 123 }); + + const plugin = new JobsPlugin({}); + const executeSpy = vi.spyOn(plugin as any, "execute"); + const handle = plugin.exports()("etl"); + + await handle.getJob(); + + expect(executeSpy).toHaveBeenCalledWith( + expect.any(Function), + expect.objectContaining({ + default: expect.objectContaining({ + cache: expect.objectContaining({ + cacheKey: ["jobs:getJob", "etl"], + }), + }), + }), + expect.any(String), + ); + }); + + test("cancelRun wraps call in execute with write defaults", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.cancelRun.mockResolvedValue(undefined); + + const plugin = new JobsPlugin({}); + const executeSpy = vi.spyOn(plugin as any, "execute"); + const handle = plugin.exports()("etl"); + + await handle.cancelRun(1); + + expect(executeSpy).toHaveBeenCalledWith( + expect.any(Function), + { default: JOBS_WRITE_DEFAULTS }, + expect.any(String), + ); + }); + }); + + describe("runAndWait polling", () => { + test("runAndWait yields status updates and terminates on TERMINATED", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + mockClient.jobs.getRun + .mockResolvedValueOnce({ + run_id: 42, + state: { life_cycle_state: "RUNNING" }, + }) + .mockResolvedValueOnce({ + run_id: 42, + state: { life_cycle_state: "TERMINATED" }, + }); + + const plugin = new JobsPlugin({ pollIntervalMs: 10 }); + const handle = plugin.exports()("etl"); + + const statuses: any[] = []; + for await (const status of handle.runAndWait()) { + statuses.push(status); + } + + expect(statuses).toHaveLength(2); + expect(statuses[0].status).toBe("RUNNING"); + expect(statuses[1].status).toBe("TERMINATED"); + }); + + test("runAndWait throws when runNow returns no run_id", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({}); + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + const gen = handle.runAndWait(); + await expect(gen.next()).rejects.toThrow( + "runNow did not return a run_id", + ); + }); + }); + + describe("error handling returns ExecutionResult", () => { + test("runNow returns error result on execute failure", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockRejectedValue(new Error("API timeout")); + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + const result = await handle.runNow(); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.status).toBeGreaterThanOrEqual(400); + // Message must be generic, not the raw server error + expect(result.message).not.toContain("API timeout"); + } + }); + + test("cancelRun returns error result on execute failure", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.cancelRun.mockRejectedValue( + new Error("Permission denied"), + ); + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + const result = await handle.cancelRun(42); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.status).toBeGreaterThanOrEqual(400); + expect(result.message).not.toContain("Permission denied"); + } + }); + + test("getRun returns error result on execute failure", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.getRun.mockRejectedValue( + new Error("Internal server error"), + ); + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + const result = await handle.getRun(42); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.status).toBeGreaterThanOrEqual(400); + expect(result.message).not.toContain("Internal server error"); + } + }); + + test("listRuns returns error result on execute failure", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.listRuns.mockImplementation(() => { + throw new Error("Auth failure"); + }); + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + const result = await handle.listRuns(); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.status).toBeGreaterThanOrEqual(400); + expect(result.message).not.toContain("Auth failure"); + } + }); + + test("error result preserves upstream HTTP status code", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const error = new Error("Detailed internal failure: db connection reset"); + (error as any).statusCode = 403; + mockClient.jobs.getRun.mockRejectedValue(error); + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + const result = await handle.getRun(42); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.status).toBe(403); + // Must use generic HTTP status text, not the raw upstream message + expect(result.message).not.toContain("db connection reset"); + } + }); + + test("successful operations return ok result with data", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + const result = await handle.runNow(); + expect(result.ok).toBe(true); + if (result.ok) { + expect(result.data.run_id).toBe(42); + } + }); + }); + + describe("runAndWait abort signal", () => { + test("runAndWait stops polling when signal is aborted", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + mockClient.jobs.getRun.mockResolvedValue({ + run_id: 42, + state: { life_cycle_state: "RUNNING" }, + }); + + const plugin = new JobsPlugin({ pollIntervalMs: 10 }); + const handle = plugin.exports()("etl"); + + const controller = new AbortController(); + const statuses: any[] = []; + + const gen = handle.runAndWait(undefined, controller.signal); + const first = await gen.next(); + statuses.push(first.value); + controller.abort(); + const second = await gen.next(); + expect(second.done).toBe(true); + expect(statuses).toHaveLength(1); + }); + }); + + describe("OBO and service principal access", () => { + test("job handle exposes asUser and all JobAPI methods", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + + expect(typeof handle.asUser).toBe("function"); + + const jobMethods = [ + "runNow", + "runAndWait", + "lastRun", + "listRuns", + "getRun", + "getRunOutput", + "cancelRun", + "getJob", + ]; + for (const method of jobMethods) { + expect(typeof (handle as any)[method]).toBe("function"); + } + }); + + test("asUser throws AuthenticationError without token in production", () => { + const originalEnv = process.env.NODE_ENV; + process.env.NODE_ENV = "production"; + process.env.DATABRICKS_JOB_ETL = "123"; + + try { + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + const mockReq = { header: () => undefined } as any; + + expect(() => handle.asUser(mockReq)).toThrow(AuthenticationError); + } finally { + process.env.NODE_ENV = originalEnv; + } + }); + + test("asUser in dev mode returns JobAPI with all methods", () => { + const originalEnv = process.env.NODE_ENV; + process.env.NODE_ENV = "development"; + process.env.DATABRICKS_JOB_ETL = "123"; + + try { + const plugin = new JobsPlugin({}); + const handle = plugin.exports()("etl"); + const mockReq = { header: () => undefined } as any; + const api = handle.asUser(mockReq); + + const jobMethods = [ + "runNow", + "runAndWait", + "lastRun", + "listRuns", + "getRun", + "getRunOutput", + "cancelRun", + "getJob", + ]; + for (const method of jobMethods) { + expect(typeof (api as any)[method]).toBe("function"); + } + } finally { + process.env.NODE_ENV = originalEnv; + } + }); + }); + + describe("clientConfig", () => { + test("returns configured job keys with params schema", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + process.env.DATABRICKS_JOB_ML = "456"; + + const plugin = new JobsPlugin({}); + const config = plugin.clientConfig(); + + expect(config).toEqual({ + jobs: { + etl: { params: null, taskType: null }, + ml: { params: null, taskType: null }, + }, + }); + }); + + test("returns single default key for DATABRICKS_JOB_ID", () => { + process.env.DATABRICKS_JOB_ID = "789"; + + const plugin = new JobsPlugin({}); + const config = plugin.clientConfig(); + + expect(config).toEqual({ + jobs: { + default: { params: null, taskType: null }, + }, + }); + }); + + test("returns empty jobs when no jobs configured", () => { + const plugin = new JobsPlugin({}); + const config = plugin.clientConfig(); + + expect(config).toEqual({ jobs: {} }); + }); + + test("includes JSON schema when params schema is configured", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({ + jobs: { + etl: { + params: z.object({ key: z.string() }), + }, + }, + }); + const config = plugin.clientConfig(); + const etlConfig = (config.jobs as any).etl; + + expect(etlConfig.params).toBeDefined(); + expect(etlConfig.params).not.toBeNull(); + expect(etlConfig.params.type).toBe("object"); + expect(etlConfig.params.properties).toHaveProperty("key"); + }); + }); + + describe("auto-discovery integration", () => { + test("jobs() with no config discovers from env vars", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + expect(() => exported("etl")).not.toThrow(); + }); + + test("jobs() with no config and no env vars creates no jobs", () => { + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + expect(() => exported("etl")).toThrow(/Unknown job/); + }); + }); + + describe("multi-job case", () => { + test("supports multiple configured jobs", () => { + process.env.DATABRICKS_JOB_ETL = "100"; + process.env.DATABRICKS_JOB_ML = "200"; + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + + expect(() => exported("etl")).not.toThrow(); + expect(() => exported("ml")).not.toThrow(); + expect(() => exported("other")).toThrow(/Unknown job "other"/); + }); + + test("each job has its own job_id", async () => { + process.env.DATABRICKS_JOB_ETL = "100"; + process.env.DATABRICKS_JOB_ML = "200"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 1 }); + + const plugin = new JobsPlugin({}); + const exported = plugin.exports(); + + await exported("etl").runNow(); + expect(mockClient.jobs.runNow).toHaveBeenCalledWith( + expect.objectContaining({ job_id: 100 }), + expect.anything(), + ); + + mockClient.jobs.runNow.mockClear(); + + await exported("ml").runNow(); + expect(mockClient.jobs.runNow).toHaveBeenCalledWith( + expect.objectContaining({ job_id: 200 }), + expect.anything(), + ); + }); + }); +}); + +describe("defaults", () => { + test("JOBS_READ_DEFAULTS has expected shape", () => { + expect(JOBS_READ_DEFAULTS.cache?.enabled).toBe(true); + expect(JOBS_READ_DEFAULTS.cache?.ttl).toBe(60); + expect(JOBS_READ_DEFAULTS.retry?.enabled).toBe(true); + expect(JOBS_READ_DEFAULTS.retry?.attempts).toBe(3); + expect(JOBS_READ_DEFAULTS.timeout).toBe(30_000); + }); + + test("JOBS_WRITE_DEFAULTS has no cache, no retry", () => { + expect(JOBS_WRITE_DEFAULTS.cache?.enabled).toBe(false); + expect(JOBS_WRITE_DEFAULTS.retry?.enabled).toBe(false); + expect(JOBS_WRITE_DEFAULTS.timeout).toBe(120_000); + }); + + test("JOBS_STREAM_DEFAULTS has extended timeout", () => { + expect(JOBS_STREAM_DEFAULTS.cache?.enabled).toBe(false); + expect(JOBS_STREAM_DEFAULTS.retry?.enabled).toBe(false); + expect(JOBS_STREAM_DEFAULTS.timeout).toBe(600_000); + }); +}); + +describe("mapParams", () => { + test("notebook maps to notebook_params with string coercion", () => { + const result = mapParams("notebook", { key: "value", num: 42 }); + expect(result).toEqual({ notebook_params: { key: "value", num: "42" } }); + }); + + test("python_wheel maps to python_named_params", () => { + const result = mapParams("python_wheel", { arg1: "a", arg2: "b" }); + expect(result).toEqual({ python_named_params: { arg1: "a", arg2: "b" } }); + }); + + test("python_script maps to python_params array", () => { + const result = mapParams("python_script", { args: ["a", "b", "c"] }); + expect(result).toEqual({ python_params: ["a", "b", "c"] }); + }); + + test("spark_jar maps to jar_params array", () => { + const result = mapParams("spark_jar", { args: ["x", "y"] }); + expect(result).toEqual({ jar_params: ["x", "y"] }); + }); + + test("sql maps to sql_params Record", () => { + const result = mapParams("sql", { p1: "v1", p2: 42 }); + expect(result).toEqual({ sql_params: { p1: "v1", p2: "42" } }); + }); + + test("dbt with empty params returns empty object", () => { + const result = mapParams("dbt", {}); + expect(result).toEqual({}); + }); + + test("dbt with params throws error", () => { + expect(() => mapParams("dbt", { key: "value" })).toThrow( + "dbt tasks do not accept parameters", + ); + }); + + test("notebook coerces non-string values to string", () => { + const result = mapParams("notebook", { + bool: true, + num: 123, + nil: "null", + }); + expect(result).toEqual({ + notebook_params: { bool: "true", num: "123", nil: "null" }, + }); + }); +}); + +describe("injectRoutes", () => { + let serviceContextMock: Awaited>; + + beforeEach(async () => { + vi.clearAllMocks(); + setupDatabricksEnv(); + ServiceContext.reset(); + serviceContextMock = await mockServiceContext(); + }); + + afterEach(() => { + serviceContextMock?.restore(); + delete process.env.DATABRICKS_JOB_ETL; + delete process.env.DATABRICKS_JOB_ML; + }); + + test("registers all 5 routes via this.route()", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { + get: vi.fn(), + post: vi.fn(), + delete: vi.fn(), + }; + + plugin.injectRoutes(mockRouter as any); + + expect(routeSpy).toHaveBeenCalledTimes(5); + + const routeCalls = routeSpy.mock.calls.map((call) => (call[1] as any).name); + expect(routeCalls).toContain("run"); + expect(routeCalls).toContain("runs"); + expect(routeCalls).toContain("run-detail"); + expect(routeCalls).toContain("status"); + expect(routeCalls).toContain("cancel-run"); + }); + + test("registers correct HTTP methods and paths", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { + get: vi.fn(), + post: vi.fn(), + delete: vi.fn(), + }; + + plugin.injectRoutes(mockRouter as any); + + const routes = routeSpy.mock.calls.map((call) => ({ + name: (call[1] as any).name, + method: (call[1] as any).method, + path: (call[1] as any).path, + })); + + expect(routes).toContainEqual({ + name: "run", + method: "post", + path: "/:jobKey/run", + }); + expect(routes).toContainEqual({ + name: "runs", + method: "get", + path: "/:jobKey/runs", + }); + expect(routes).toContainEqual({ + name: "run-detail", + method: "get", + path: "/:jobKey/runs/:runId", + }); + expect(routes).toContainEqual({ + name: "status", + method: "get", + path: "/:jobKey/status", + }); + expect(routes).toContainEqual({ + name: "cancel-run", + method: "delete", + path: "/:jobKey/runs/:runId", + }); + }); + + describe("_resolveJob", () => { + test("returns 404 for unknown job key", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const resolveJob = (plugin as any)._resolveJob.bind(plugin); + + const mockReq = { params: { jobKey: "unknown" } } as any; + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + const result = resolveJob(mockReq, mockRes); + + expect(result.jobKey).toBeUndefined(); + expect(mockRes.status).toHaveBeenCalledWith(404); + expect(mockRes.json).toHaveBeenCalledWith( + expect.objectContaining({ + error: 'Unknown job "unknown"', + plugin: "jobs", + }), + ); + }); + + test("sanitizes special characters in unknown job key error", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const resolveJob = (plugin as any)._resolveJob.bind(plugin); + + const mockReq = { + params: { jobKey: '' }, + } as any; + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + resolveJob(mockReq, mockRes); + + expect(mockRes.json).toHaveBeenCalledWith( + expect.objectContaining({ + error: 'Unknown job "scriptalertxssscript"', + }), + ); + }); + + test("returns jobKey and jobId for known job", () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const resolveJob = (plugin as any)._resolveJob.bind(plugin); + + const mockReq = { params: { jobKey: "etl" } } as any; + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + const result = resolveJob(mockReq, mockRes); + + expect(result.jobKey).toBe("etl"); + expect(result.jobId).toBe(123); + expect(mockRes.status).not.toHaveBeenCalled(); + }); + }); + + describe("POST /:jobKey/run handler", () => { + test("returns runId on successful non-streaming run", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "run", + ); + const handler = (runRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + body: { params: { key: "value" } }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + setHeader: vi.fn(), + flushHeaders: vi.fn(), + write: vi.fn(), + end: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.json).toHaveBeenCalledWith({ runId: 42 }); + }); + + test("returns 400 on parameter validation failure", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({ + jobs: { + etl: { + taskType: "notebook", + params: z.object({ key: z.string() }), + }, + }, + }); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "run", + ); + const handler = (runRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + body: { params: { key: 42 } }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(400); + expect(mockRes.json).toHaveBeenCalledWith( + expect.objectContaining({ + plugin: "jobs", + }), + ); + }); + }); + + describe("GET /:jobKey/runs handler", () => { + test("returns runs with default pagination", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const mockRuns = [ + { run_id: 1, state: { life_cycle_state: "TERMINATED" } }, + { run_id: 2, state: { life_cycle_state: "RUNNING" } }, + ]; + mockClient.jobs.listRuns.mockReturnValue( + (async function* () { + for (const run of mockRuns) yield run; + })(), + ); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runsRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "runs", + ); + const handler = (runsRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.json).toHaveBeenCalledWith({ + runs: mockRuns, + }); + }); + + test("passes limit query param to listRuns", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.listRuns.mockReturnValue((async function* () {})()); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runsRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "runs", + ); + const handler = (runsRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + query: { limit: "5" }, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + // Verify the connector was called with limit 5 + expect(mockClient.jobs.listRuns).toHaveBeenCalledWith( + expect.objectContaining({ limit: 5 }), + expect.anything(), + ); + }); + }); + + describe("GET /:jobKey/runs/:runId handler", () => { + test("returns run detail", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const mockRun = { + run_id: 42, + state: { life_cycle_state: "TERMINATED" }, + }; + mockClient.jobs.getRun.mockResolvedValue(mockRun); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const detailRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "run-detail", + ); + const handler = (detailRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl", runId: "42" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.json).toHaveBeenCalledWith(mockRun); + }); + + test("returns 400 for invalid runId", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const detailRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "run-detail", + ); + const handler = (detailRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl", runId: "not-a-number" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(400); + expect(mockRes.json).toHaveBeenCalledWith({ + error: "Invalid runId", + plugin: "jobs", + }); + }); + }); + + describe("GET /:jobKey/status handler", () => { + test("returns latest run status", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const mockRun = { + run_id: 42, + state: { life_cycle_state: "TERMINATED" }, + }; + mockClient.jobs.listRuns.mockReturnValue( + (async function* () { + yield mockRun; + })(), + ); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const statusRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "status", + ); + const handler = (statusRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.json).toHaveBeenCalledWith({ + status: "TERMINATED", + run: mockRun, + }); + }); + + test("returns null status when no runs exist", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.listRuns.mockReturnValue((async function* () {})()); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const statusRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "status", + ); + const handler = (statusRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.json).toHaveBeenCalledWith({ + status: null, + run: null, + }); + }); + }); + + describe("DELETE /:jobKey/runs/:runId handler", () => { + test("cancels run and returns 204", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.cancelRun.mockResolvedValue(undefined); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const cancelRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "cancel-run", + ); + const handler = (cancelRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl", runId: "42" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + end: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(204); + expect(mockRes.end).toHaveBeenCalled(); + }); + + test("returns 400 for invalid runId", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const cancelRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "cancel-run", + ); + const handler = (cancelRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl", runId: "not-a-number" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + end: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(400); + expect(mockRes.json).toHaveBeenCalledWith({ + error: "Invalid runId", + plugin: "jobs", + }); + }); + + test("returns 404 for unknown job key", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const cancelRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "cancel-run", + ); + const handler = (cancelRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "unknown", runId: "42" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + end: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(404); + }); + }); + + describe("POST /:jobKey/run params validation", () => { + test("returns 400 when params is an array", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "run", + ); + const handler = (runRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + body: { params: [1, 2, 3] }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(400); + expect(mockRes.json).toHaveBeenCalledWith({ + error: "params must be a plain object", + plugin: "jobs", + }); + }); + + test("returns 400 when params is a string", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "run", + ); + const handler = (runRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + body: { params: "not-an-object" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(400); + expect(mockRes.json).toHaveBeenCalledWith({ + error: "params must be a plain object", + plugin: "jobs", + }); + }); + + test("allows undefined params", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.runNow.mockResolvedValue({ run_id: 42 }); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "run", + ); + const handler = (runRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + body: {}, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + setHeader: vi.fn(), + flushHeaders: vi.fn(), + write: vi.fn(), + end: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.json).toHaveBeenCalledWith({ runId: 42 }); + }); + }); + + describe("routes propagate error status codes", () => { + test("POST /:jobKey/run returns upstream status on failure", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const error = new Error("Sensitive internal detail: token expired"); + (error as any).statusCode = 403; + mockClient.jobs.runNow.mockRejectedValue(error); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "run", + ); + const handler = (runRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + body: {}, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + headersSent: false, + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(403); + expect(mockRes.json).toHaveBeenCalledWith( + expect.objectContaining({ plugin: "jobs" }), + ); + // Must not leak raw server error message + const responseError = mockRes.json.mock.calls[0][0].error; + expect(responseError).not.toContain("token expired"); + }); + + test("GET /:jobKey/runs returns upstream status on failure", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const error = new Error("Unauthorized"); + (error as any).statusCode = 401; + mockClient.jobs.listRuns.mockImplementation(() => { + throw error; + }); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runsRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "runs", + ); + const handler = (runsRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(401); + }); + + test("DELETE /:jobKey/runs/:runId returns upstream status on failure", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + const error = new Error("Forbidden"); + (error as any).statusCode = 403; + mockClient.jobs.cancelRun.mockRejectedValue(error); + + const plugin = new JobsPlugin({}); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const cancelRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "cancel-run", + ); + const handler = (cancelRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl", runId: "42" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + end: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(403); + // Should NOT fall through to 204 + expect(mockRes.end).not.toHaveBeenCalled(); + }); + }); + + describe("OBO header forwarding", () => { + test("routes call this.asUser(req) for user context", async () => { + process.env.DATABRICKS_JOB_ETL = "123"; + + mockClient.jobs.listRuns.mockReturnValue((async function* () {})()); + + const plugin = new JobsPlugin({}); + const asUserSpy = vi.spyOn(plugin, "asUser"); + const routeSpy = vi.spyOn(plugin as any, "route"); + + const mockRouter = { get: vi.fn(), post: vi.fn(), delete: vi.fn() }; + plugin.injectRoutes(mockRouter as any); + + const runsRoute = routeSpy.mock.calls.find( + (call) => (call[1] as any).name === "runs", + ); + const handler = (runsRoute?.[1] as any).handler; + + const mockReq = { + params: { jobKey: "etl" }, + query: {}, + header: vi.fn().mockReturnValue("test-token"), + } as any; + + const mockRes = { + status: vi.fn().mockReturnThis(), + json: vi.fn(), + } as any; + + await handler(mockReq, mockRes); + + expect(asUserSpy).toHaveBeenCalledWith(mockReq); + }); + }); +}); diff --git a/packages/appkit/src/plugins/jobs/types.ts b/packages/appkit/src/plugins/jobs/types.ts new file mode 100644 index 00000000..ca68ba0b --- /dev/null +++ b/packages/appkit/src/plugins/jobs/types.ts @@ -0,0 +1,95 @@ +import type { jobs } from "@databricks/sdk-experimental"; +import type { BasePluginConfig, IAppRequest } from "shared"; +import type { z } from "zod"; +import type { ExecutionResult } from "../../plugin"; + +/** Supported task types for job parameter mapping. */ +export type TaskType = + | "notebook" + | "python_wheel" + | "python_script" + | "spark_jar" + | "sql" + | "dbt"; + +/** Per-job configuration options. */ +export interface JobConfig { + /** Maximum time (ms) to poll in runAndWait before giving up. Defaults to 600 000 (10 min). */ + waitTimeout?: number; + /** The type of task this job runs. Determines how params are mapped to the SDK request. */ + taskType?: TaskType; + /** Optional Zod schema for validating job parameters at runtime. */ + params?: z.ZodType>; +} + +/** Status update yielded by runAndWait during polling. */ +export interface JobRunStatus { + status: string | undefined; + timestamp: number; + run: jobs.Run; +} + +/** User-facing API for a single configured job. */ +export interface JobAPI { + /** Trigger the configured job with validated params. Returns the run response. */ + runNow( + params?: Record, + ): Promise>; + /** Trigger and poll until completion, yielding status updates. */ + runAndWait( + params?: Record, + signal?: AbortSignal, + ): AsyncGenerator; + /** Get the most recent run for this job. */ + lastRun(): Promise>; + /** List runs for this job. */ + listRuns(options?: { + limit?: number; + }): Promise>; + /** Get a specific run by ID. */ + getRun(runId: number): Promise>; + /** Get output of a specific run. */ + getRunOutput(runId: number): Promise>; + /** Cancel a specific run. */ + cancelRun(runId: number): Promise>; + /** Get the job definition. */ + getJob(): Promise>; +} + +/** Configuration for the Jobs plugin. */ +export interface IJobsConfig extends BasePluginConfig { + /** Operation timeout in milliseconds. Defaults to 60000. */ + timeout?: number; + /** Poll interval for waitForRun in milliseconds. Defaults to 5000. */ + pollIntervalMs?: number; + /** Named jobs to expose. Each key becomes a job accessor. */ + jobs?: Record; +} + +/** + * Job handle returned by `appkit.jobs("etl")`. + * Supports OBO access via `.asUser(req)`. + */ +export type JobHandle = JobAPI & { + asUser: (req: IAppRequest) => JobAPI; +}; + +/** + * Public API shape of the jobs plugin. + * Callable to select a job by key. + * + * @example + * ```ts + * // Trigger a configured job + * const { run_id } = await appkit.jobs("etl").runNow(); + * + * // Trigger and poll until completion + * for await (const status of appkit.jobs("etl").runAndWait()) { + * console.log(status.status, status.run); + * } + * + * // OBO access + * await appkit.jobs("etl").asUser(req).runNow(); + * ``` + */ +export type JobsExport = (jobKey: string) => JobHandle; diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index 8b511fac..92c33da1 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -121,6 +121,11 @@ export class StreamManager { streamEntry.clients.delete(res); this.activeOperations.delete(streamOperation); + // Stop the generator when no clients remain + if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { + streamEntry.abortController.abort("All clients disconnected"); + } + // cleanup if stream is completed and no clients are connected if (streamEntry.isCompleted && streamEntry.clients.size === 0) { setTimeout(() => { @@ -199,6 +204,12 @@ export class StreamManager { clearInterval(heartbeat); this.activeOperations.delete(streamOperation); streamEntry.clients.delete(res); + + // Stop the generator when no clients remain so polling loops + // (e.g. jobs runAndWait) don't keep running in the background. + if (streamEntry.clients.size === 0 && !streamEntry.isCompleted) { + abortController.abort("Client disconnected"); + } }); await this._processGeneratorInBackground(streamEntry); diff --git a/packages/appkit/src/stream/tests/stream.test.ts b/packages/appkit/src/stream/tests/stream.test.ts index fae54289..ae626f46 100644 --- a/packages/appkit/src/stream/tests/stream.test.ts +++ b/packages/appkit/src/stream/tests/stream.test.ts @@ -193,6 +193,43 @@ describe("StreamManager", () => { expect(streamManager.getActiveCount()).toBe(0); }); + + test("should abort generator when last client disconnects", async () => { + const { mockRes } = createMockResponse(); + let closeHandler: (() => void) | undefined; + + mockRes.on.mockImplementation((event: string, handler: () => void) => { + if (event === "close") closeHandler = handler; + }); + + let signalAborted = false; + + async function* generator(signal: AbortSignal) { + yield { type: "start" }; + // Simulate a long-running polling loop that respects the signal + await new Promise((resolve) => { + if (signal.aborted) { + resolve(); + return; + } + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + signalAborted = signal.aborted; + } + + const streamPromise = streamManager.stream(mockRes as any, generator); + + // Let the generator yield "start" and enter the signal wait + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Simulate client disconnect + if (closeHandler) closeHandler(); + + await streamPromise; + + expect(signalAborted).toBe(true); + expect(streamManager.getActiveCount()).toBe(0); + }); }); describe("error handling", () => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9ca11b81..5920e05e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -326,6 +326,9 @@ importers: ws: specifier: 8.18.3 version: 8.18.3(bufferutil@4.0.9) + zod: + specifier: ^4.3.6 + version: 4.3.6 devDependencies: '@types/express': specifier: 4.17.25 @@ -5539,7 +5542,7 @@ packages: basic-ftp@5.0.5: resolution: {integrity: sha512-4Bcg1P8xhUuqcii/S0Z9wiHIrQVPMermM1any+MX5GeGD7faD3/msQUDGLol9wOcz4/jbg/WJnGqoJF6LiBdtg==} engines: {node: '>=10.0.0'} - deprecated: Security vulnerability fixed in 5.2.0, please upgrade + deprecated: Security vulnerability fixed in 5.2.1, please upgrade batch@0.6.1: resolution: {integrity: sha512-x+VAiMRL6UPkx+kudNvxTl6hB2XNNCG2r+7wixVfIYwu/2HKRXimwQyaumLjMveWvT2Hkd/cAJw+QBMfJ/EKVw==} @@ -6653,6 +6656,7 @@ packages: dottie@2.0.6: resolution: {integrity: sha512-iGCHkfUc5kFekGiqhe8B/mdaurD+lakO9txNnTvKtA6PISrw86LgqHvRzWYPyoE2Ph5aMIrCw9/uko6XHTKCwA==} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. drizzle-orm@0.45.1: resolution: {integrity: sha512-Te0FOdKIistGNPMq2jscdqngBRfBpC8uMFVwqjf6gtTVJHIQ/dosgV/CLBU2N4ZJBsXL5savCba9b0YJskKdcA==} @@ -11919,6 +11923,9 @@ packages: zod@4.1.13: resolution: {integrity: sha512-AvvthqfqrAhNH9dnfmrfKzX5upOdjUVJYFqNSlkmGf64gRaTzlPwz99IHYnVs28qYAybvAlBV+H7pn0saFY4Ig==} + zod@4.3.6: + resolution: {integrity: sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==} + zrender@6.0.0: resolution: {integrity: sha512-41dFXEEXuJpNecuUQq6JlbybmnHaqqpGlbH1yxnA5V9MMP4SbohSVZsJIwz+zdjQXSSlR1Vc34EgH1zxyTDvhg==} @@ -25288,6 +25295,8 @@ snapshots: zod@4.1.13: {} + zod@4.3.6: {} + zrender@6.0.0: dependencies: tslib: 2.3.0 diff --git a/template/appkit.plugins.json b/template/appkit.plugins.json index d1420d2e..6b71a5e1 100644 --- a/template/appkit.plugins.json +++ b/template/appkit.plugins.json @@ -74,6 +74,30 @@ "optional": [] } }, + "jobs": { + "name": "jobs", + "displayName": "Jobs Plugin", + "description": "Trigger and monitor Databricks Lakeflow Jobs. Supports single-job mode (DATABRICKS_JOB_ID) or multi-job mode (DATABRICKS_JOB_{NAME} per job, e.g. DATABRICKS_JOB_ETL, DATABRICKS_JOB_ML).", + "package": "@databricks/appkit", + "resources": { + "required": [ + { + "type": "job", + "alias": "Job", + "resourceKey": "job", + "description": "A Databricks job to trigger and monitor", + "permission": "CAN_MANAGE_RUN", + "fields": { + "id": { + "env": "DATABRICKS_JOB_ID", + "description": "Job ID (numeric). Use DATABRICKS_JOB_ID for single-job mode, or DATABRICKS_JOB_{NAME} for each named job. Obtain from the Jobs UI or `databricks jobs list`." + } + } + } + ], + "optional": [] + } + }, "lakebase": { "name": "lakebase", "displayName": "Lakebase",