From 669788ab78fabffcc6dc87e922e10e67ac1b4fea Mon Sep 17 00:00:00 2001 From: Hugo Dutka Date: Mon, 19 Jan 2026 18:01:56 +0100 Subject: [PATCH 1/2] feat(api): add optional auth to devhook routes --- internal/api/src/middleware.ts | 15 +++++++ internal/api/src/routes/devhook.server.ts | 5 ++- internal/api/src/routes/devhook.test.ts | 51 +++++++++++++++++------ internal/api/src/server.ts | 1 + internal/api/src/test.ts | 2 + internal/worker/src/new-api.ts | 1 + packages/server/src/server.ts | 1 + 7 files changed, 62 insertions(+), 14 deletions(-) diff --git a/internal/api/src/middleware.ts b/internal/api/src/middleware.ts index 141f1ee0..be44b830 100644 --- a/internal/api/src/middleware.ts +++ b/internal/api/src/middleware.ts @@ -205,6 +205,21 @@ function readAuthTokenFromRequest( export const withAuth = createAuthMiddleware(); +export const withDevhookAuth: MiddlewareHandler<{ + Bindings: Bindings; + Variables: { + user_id?: string; + api_key?: ApiKey; + auth_type?: "session" | "api_key"; + }; +}> = async (c, next) => { + if (c.env.devhook?.disableAuth) { + await next(); + return; + } + return withAuth(c as Parameters[0], next); +}; + export const withOrganizationIDQueryParam: MiddlewareHandler< { Bindings: Bindings; diff --git a/internal/api/src/routes/devhook.server.ts b/internal/api/src/routes/devhook.server.ts index 9ed3423a..a97aaddf 100644 --- a/internal/api/src/routes/devhook.server.ts +++ b/internal/api/src/routes/devhook.server.ts @@ -1,9 +1,10 @@ import { validate } from "uuid"; +import { withDevhookAuth } from "../middleware"; import type { APIServer } from "../server"; import { createWebhookURL } from "../server-helper"; export default function mountDevhook(server: APIServer) { - server.get("/:devhook/url", async (c) => { + server.get("/:devhook/url", withDevhookAuth, async (c) => { const id = c.req.param("devhook"); if (!validate(id)) { return c.json({ message: "Invalid devhook ID" }, 400); @@ -20,7 +21,7 @@ export default function mountDevhook(server: APIServer) { return c.json({ url }); }); - server.get("/:devhook", async (c) => { + server.get("/:devhook", withDevhookAuth, async (c) => { const id = c.req.param("devhook"); if (!validate(id)) { return c.json({ message: "Invalid devhook ID" }, 400); diff --git a/internal/api/src/routes/devhook.test.ts b/internal/api/src/routes/devhook.test.ts index d14e7160..f0df42bc 100644 --- a/internal/api/src/routes/devhook.test.ts +++ b/internal/api/src/routes/devhook.test.ts @@ -28,10 +28,43 @@ test("devhook url with path-based routing", async () => { expect(url).toBe(expectedUrl.toString()); }); -test("devhook", async () => { - const { helpers, bindings, url } = await serve(); - // This endpoint doesn't actually need auth, we just - // create a user to easily get a client. +test("devhook routes require auth by default", async () => { + const { url } = await serve(); + const id = crypto.randomUUID(); + + const urlResponse = await fetch(new URL(`/api/devhook/${id}/url`, url)); + expect(urlResponse.status).toBe(401); + + const listenResponse = await fetch(new URL(`/api/devhook/${id}`, url)); + expect(listenResponse.status).toBe(401); +}); + +test("devhook routes allow unauthenticated access when disableAuth is true", async () => { + const { url, bindings } = await serve({ + bindings: { devhook: { disableAuth: true } }, + }); + const id = crypto.randomUUID(); + + const urlResponse = await fetch(new URL(`/api/devhook/${id}/url`, url)); + expect(urlResponse.status).toBe(200); + const data = await urlResponse.json(); + expect(data.url).toBe( + bindings.createRequestURL!(id).toString().replace(/\/$/, "") + ); + + // Listen endpoint tries to upgrade to WebSocket, so without proper headers + // it won't succeed, but it should not return 401 + const listenResponse = await fetch(new URL(`/api/devhook/${id}`, url)); + expect(listenResponse.status).not.toBe(401); +}); + +test.each([ + { disableAuth: true, name: "without auth" }, + { disableAuth: false, name: "with auth" }, +])("devhook listen $name", async ({ disableAuth }) => { + const { helpers, bindings, url } = await serve({ + bindings: { devhook: { disableAuth } }, + }); const { client } = await helpers.createUser(); const id = crypto.randomUUID(); @@ -44,12 +77,9 @@ test("devhook", async () => { let requestReceived = false; client.devhook.listen({ id, - onError: (err) => { - console.error("Error", err); - }, - onRequest: async (req) => { + onError: () => {}, + onRequest: async () => { requestReceived = true; - // Verify the request URL is correct. return new Response("Hello from devhook!"); }, onConnect: () => { @@ -57,11 +87,8 @@ test("devhook", async () => { }, }); - // Ensure connection works. await connectPromise; - // Test wildcard hostname routing. - // We need to make the request go through the test server with the correct Host header. const devhookURL = bindings.createRequestURL!(id); const response = await fetch(url, { headers: { Host: devhookURL.host } }); expect(response.status).toBe(200); diff --git a/internal/api/src/server.ts b/internal/api/src/server.ts index 0944d99e..74ce46e5 100644 --- a/internal/api/src/server.ts +++ b/internal/api/src/server.ts @@ -196,6 +196,7 @@ export interface Bindings { readonly devhook?: { readonly handleListen: (id: string, req: Request) => Promise; readonly handleRequest: (id: string, req: Request) => Promise; + readonly disableAuth?: boolean; }; readonly sendEmail?: (email: Email) => Promise; readonly sendTelemetryEvent?: (event: TelemetryEvent) => Promise; diff --git a/internal/api/src/test.ts b/internal/api/src/test.ts index 2e4e481b..f7fe18b9 100644 --- a/internal/api/src/test.ts +++ b/internal/api/src/test.ts @@ -14,6 +14,7 @@ export interface PartialBindings Bindings, | "auth" | "chat" + | "devhook" | "files" | "logs" | "traces" @@ -24,6 +25,7 @@ export interface PartialBindings > { auth?: Partial; chat?: Partial; + devhook?: Partial>; files?: Partial; logs?: Partial; traces?: Partial; diff --git a/internal/worker/src/new-api.ts b/internal/worker/src/new-api.ts index dba2fa9a..2fbc1781 100644 --- a/internal/worker/src/new-api.ts +++ b/internal/worker/src/new-api.ts @@ -252,6 +252,7 @@ export default function handleNewAPI( }, }, devhook: { + disableAuth: true, handleListen: async (id, req) => { const ws = env.WORKSPACE.get( env.WORKSPACE.idFromName(id) diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 1152c576..a26c9d9e 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -228,6 +228,7 @@ export async function startServer(options: ServerOptions) { devhook: { handleListen: devhook.handleListen, handleRequest: devhook.handleRequest, + disableAuth: process.env.BLINK_DEVHOOK_DISABLE_AUTH !== undefined, }, chat: { async handleMessagesChanged(event, id, messages) { From d26f7cde20acba526dbb014ad802c60e4c6d7bf8 Mon Sep 17 00:00:00 2001 From: Hugo Dutka Date: Thu, 22 Jan 2026 12:54:40 +0100 Subject: [PATCH 2/2] devhook auth wip --- internal/api/src/routes/devhook.server.ts | 6 + internal/api/src/routes/devhook.test.ts | 3 + packages/server/src/devhook.test.ts | 172 ++++--- packages/server/src/server.ts | 531 ++++++++++++---------- packages/server/src/test.ts | 233 ++++++++++ 5 files changed, 637 insertions(+), 308 deletions(-) create mode 100644 packages/server/src/test.ts diff --git a/internal/api/src/routes/devhook.server.ts b/internal/api/src/routes/devhook.server.ts index a97aaddf..667c6526 100644 --- a/internal/api/src/routes/devhook.server.ts +++ b/internal/api/src/routes/devhook.server.ts @@ -4,6 +4,9 @@ import type { APIServer } from "../server"; import { createWebhookURL } from "../server-helper"; export default function mountDevhook(server: APIServer) { + // this endpoint is used by packages/server/src/server.ts + // to authorize the listen request. it must use the exact same auth + // method as the one required to listen on the matching devhook URL. server.get("/:devhook/url", withDevhookAuth, async (c) => { const id = c.req.param("devhook"); if (!validate(id)) { @@ -21,6 +24,9 @@ export default function mountDevhook(server: APIServer) { return c.json({ url }); }); + // this endpoint is somewhat misleading. in self-hosted mode, + // it's not used during the flow to listen on the devhook URL. + // websocket upgrade logic is handled in packages/server/src/server.ts server.get("/:devhook", withDevhookAuth, async (c) => { const id = c.req.param("devhook"); if (!validate(id)) { diff --git a/internal/api/src/routes/devhook.test.ts b/internal/api/src/routes/devhook.test.ts index f0df42bc..943c293e 100644 --- a/internal/api/src/routes/devhook.test.ts +++ b/internal/api/src/routes/devhook.test.ts @@ -87,8 +87,11 @@ test.each([ }, }); + // Ensure connection works. await connectPromise; + // Test wildcard hostname routing. + // We need to make the request go through the test server with the correct Host header. const devhookURL = bindings.createRequestURL!(id); const response = await fetch(url, { headers: { Host: devhookURL.host } }); expect(response.status).toBe(200); diff --git a/packages/server/src/devhook.test.ts b/packages/server/src/devhook.test.ts index 8161262c..ba88a8db 100644 --- a/packages/server/src/devhook.test.ts +++ b/packages/server/src/devhook.test.ts @@ -1,25 +1,13 @@ -import { afterAll, beforeAll, describe, expect, test } from "bun:test"; -import { serve } from "@blink.so/api/test"; +import { afterAll, describe, expect, test } from "bun:test"; +import Client from "@blink.so/api"; import { createDevhookSupport } from "./devhook"; +import { serve } from "./test"; -describe("devhook integration tests", () => { - let server: Awaited>; - let devhookSupport: ReturnType; +describe("devhook integration tests", async () => { + const server = await serve(); - beforeAll(async () => { - server = await serve(); - - // Create devhook support with the test server's database - const querier = await server.bindings.database(); - devhookSupport = createDevhookSupport({ - accessUrl: server.url.toString(), - wildcardAccessUrl: `*.${server.url.host}`, - querier, - }); - }); - - afterAll(() => { - server.stop(); + afterAll(async () => { + await server[Symbol.asyncDispose](); }); describe("UUID validation", () => { @@ -51,34 +39,116 @@ describe("devhook integration tests", () => { }); }); - describe("handleRequest", () => { - test("rejects WebSocket upgrade requests before proxying", async () => { - const id = crypto.randomUUID(); + describe("createDevhookSupport", async () => { + const devhookSupport = createDevhookSupport({ + accessUrl: server.url.toString(), + wildcardAccessUrl: `*.${server.url.host}`, + querier: await server.bindings.database(), + }); - // Use the real createDevhookSupport handleRequest - const req = new Request("http://localhost/test", { - headers: { - Upgrade: "websocket", - Connection: "Upgrade", - }, + describe("handleRequest", () => { + test("rejects WebSocket upgrade requests before proxying", async () => { + const id = crypto.randomUUID(); + + // Use the real createDevhookSupport handleRequest + const req = new Request("http://localhost/test", { + headers: { + Upgrade: "websocket", + Connection: "Upgrade", + }, + }); + + const response = await devhookSupport.handleRequest(id, req); + + expect(response.status).toBe(501); + const body = await response.json(); + expect(body.message).toBe("WebSocket proxying not supported"); }); - const response = await devhookSupport.handleRequest(id, req); + test("returns 503 when devhook not connected", async () => { + const id = crypto.randomUUID(); + const req = new Request("http://localhost/test"); - expect(response.status).toBe(501); - const body = await response.json(); - expect(body.message).toBe("WebSocket proxying not supported"); + const response = await devhookSupport.handleRequest(id, req); + + expect(response.status).toBe(503); + const body = await response.json(); + expect(body.message).toBe("Devhook not connected"); + }); }); - test("returns 503 when devhook not connected", async () => { - const id = crypto.randomUUID(); - const req = new Request("http://localhost/test"); + describe("matchRequestHost", () => { + test("extracts UUID from wildcard hostname", () => { + const id = crypto.randomUUID(); + const host = `${id}.${server.url.host}`; - const response = await devhookSupport.handleRequest(id, req); + const matched = devhookSupport.matchRequestHost?.(host); + expect(matched).toBe(id); + }); + + test("returns undefined for base host", () => { + const matched = devhookSupport.matchRequestHost?.(server.url.host); + expect(matched).toBeUndefined(); + }); - expect(response.status).toBe(503); + test("returns undefined for invalid UUID subdomain", () => { + const host = `not-a-uuid.${server.url.host}`; + const matched = devhookSupport.matchRequestHost?.(host); + expect(matched).toBeUndefined(); + }); + + test("returns undefined for unrelated host", () => { + const matched = devhookSupport.matchRequestHost?.("example.com"); + expect(matched).toBeUndefined(); + }); + }); + }); + + describe("authentication", () => { + test("requires auth for devhook listen", async () => { + const id = crypto.randomUUID(); + const client = new Client({ baseURL: server.url.toString() }); + let connected = false; + let errorEvent: unknown; + + const outcome = await new Promise<"error" | "disconnect">( + (resolve, reject) => { + const timer = setTimeout(() => { + reject(new Error("Timed out waiting for devhook auth failure")); + }, 5000); + + let disposable: { dispose: () => void } | undefined; + disposable = client.devhook.listen({ + id, + onRequest: async () => new Response("ok"), + onConnect: () => { + connected = true; + }, + onDisconnect: () => { + clearTimeout(timer); + disposable?.dispose(); + resolve("disconnect"); + }, + onError: (err) => { + errorEvent = err; + clearTimeout(timer); + disposable?.dispose(); + resolve("error"); + }, + }); + } + ); + + expect(outcome).toBe("error"); + expect(connected).toBe(false); + expect(errorEvent).toBeDefined(); + + // Assert the actual auth error message via HTTP since WebSocket errors + // do not expose the handshake response body. + const response = await client.request("GET", `/api/devhook/${id}/url`); + expect(response.status).toBe(401); const body = await response.json(); - expect(body.message).toBe("Devhook not connected"); + expect(body.message).toBe("Unauthorized"); }); }); @@ -159,30 +229,4 @@ describe("devhook integration tests", () => { expect(receivedBody).toBe('{"test":"data"}'); }); }); - - describe("matchRequestHost", () => { - test("extracts UUID from wildcard hostname", () => { - const id = crypto.randomUUID(); - const host = `${id}.${server.url.host}`; - - const matched = devhookSupport.matchRequestHost?.(host); - expect(matched).toBe(id); - }); - - test("returns undefined for base host", () => { - const matched = devhookSupport.matchRequestHost?.(server.url.host); - expect(matched).toBeUndefined(); - }); - - test("returns undefined for invalid UUID subdomain", () => { - const host = `not-a-uuid.${server.url.host}`; - const matched = devhookSupport.matchRequestHost?.(host); - expect(matched).toBeUndefined(); - }); - - test("returns undefined for unrelated host", () => { - const matched = devhookSupport.matchRequestHost?.("example.com"); - expect(matched).toBeUndefined(); - }); - }); }); diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index a26c9d9e..eb759c74 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -1,4 +1,4 @@ -import api from "@blink.so/api/server"; +import api, { type Bindings } from "@blink.so/api/server"; import connectToPostgres from "@blink.so/database/postgres"; import Querier from "@blink.so/database/querier"; import pkg from "../package.json" with { type: "json" }; @@ -10,6 +10,7 @@ import { type IncomingMessage, type ServerResponse, } from "node:http"; +import type { ExecutionContext } from "hono"; import module from "module"; import path, { join } from "path"; import { parse } from "url"; @@ -20,7 +21,7 @@ import { createDevhookSupport } from "./devhook"; type WSData = { type: "token"; id: string } | { type: "chat"; chatID: string }; -interface ServerOptions { +export interface ServerOptions { port: number; postgresUrl: string; authSecret: string; @@ -32,7 +33,13 @@ interface ServerOptions { // Files are now stored in the database instead of in-memory -export async function startServer(options: ServerOptions) { +export type StartedServer = ReturnType & { + bindings: Bindings; +}; + +export async function startServer( + options: ServerOptions +): Promise { const { port, postgresUrl, @@ -130,248 +137,246 @@ export async function startServer(options: ServerOptions) { }); }; + const bindings: Bindings = { + AUTH_SECRET: authSecret, + NODE_ENV: "development", + autoJoinOrganizations: true, + serverVersion: pkg.version, + ONBOARDING_AGENT_BUNDLE_URL: + "https://artifacts.blink.host/starter-agent/bundle.tar.gz", + agentStore: (deploymentTargetID) => { + return { + delete: async (key) => { + await querier.deleteAgentStorageKV({ + deployment_target_id: deploymentTargetID, + key, + }); + }, + get: async (key) => { + const value = await querier.selectAgentStorageKV({ + deployment_target_id: deploymentTargetID, + key, + }); + if (!value) { + return undefined; + } + return value.value; + }, + set: async (key, value) => { + const target = + await querier.selectAgentDeploymentTargetByID(deploymentTargetID); + if (!target) { + throw new Error("Deployment target not found"); + } + await querier.upsertAgentStorageKV({ + agent_deployment_target_id: target.id, + agent_id: target.agent_id, + key: key, + value: value, + }); + }, + list: async (prefix, options) => { + const values = await querier.selectAgentStorageKVByPrefix({ + deployment_target_id: deploymentTargetID, + prefix: prefix ?? "", + limit: options?.limit ?? 100, + cursor: options?.cursor, + }); + return { + entries: values.items.map((value) => ({ + key: value.key, + value: value.value, + })), + cursor: values.next_cursor ? values.next_cursor : undefined, + }; + }, + }; + }, + database: async () => { + return querier; + }, + apiBaseURL: new URL(baseUrl), + accessUrl: new URL(accessUrl), + matchRequestHost, + createRequestURL, + auth: { + handleWebSocketTokenRequest: async (id, request) => { + // WebSocket upgrades are handled in the 'upgrade' event + return new Response(null, { status: 101 }); + }, + sendTokenToWebSocket: async (id, token) => { + wss.clients.forEach((client) => { + const data = wsDataMap.get(client); + if ( + client.readyState === WebSocket.OPEN && + data?.type === "token" && + data.id === id + ) { + client.send(token); + client.close(); + } + }); + }, + }, + devhook: { + handleListen: devhook.handleListen, + handleRequest: devhook.handleRequest, + disableAuth: process.env.BLINK_DEVHOOK_DISABLE_AUTH !== undefined, + }, + chat: { + async handleMessagesChanged(event, id, messages) { + await chatManagerRef.current?.handleMessagesChanged( + event, + id, + messages + ); + }, + handleStart: async (opts) => { + await chatManagerRef.current?.handleStart(opts); + }, + handleStop: async (id) => { + await chatManagerRef.current?.handleStop(id); + }, + handleStream: async (id, req) => { + if (!chatManagerRef.current) { + return new Response("Server not ready", { status: 503 }); + } + // WebSocket upgrades are handled in the 'upgrade' event + if (req.headers.get("upgrade")?.toLowerCase() === "websocket") { + return new Response(null, { status: 101 }); + } + return await chatManagerRef.current.handleStream(id, req); + }, + generateTitle: async (opts) => { + // noop + }, + }, + deployAgent: async (deployment) => { + await deployAgentWithDocker({ + image: + process.env.BLINK_AGENT_IMAGE ?? "ghcr.io/coder/blink-agent:latest", + deployment, + querier, + baseUrl, + accessUrl, + authSecret, + downloadFile: async (id: string) => { + const file = await querier.selectFileByID(id); + if (!file || !file.content) { + throw new Error("File not found"); + } + + // Convert buffer back to ReadableStream + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(file.content); + controller.close(); + }, + }); + + return { + stream, + type: file.content_type, + name: file.name, + size: file.byte_length, + }; + }, + }); + }, + files: { + upload: async (opts) => { + const id = crypto.randomUUID(); + + // Read file content into buffer + const arrayBuffer = await opts.file.arrayBuffer(); + const buffer = Buffer.from(arrayBuffer); + + // Store file in database + await querier.insertFile({ + id, + name: opts.file.name, + message_id: null, + user_id: null, + organization_id: null, + content_type: opts.file.type, + byte_length: opts.file.size, + pdf_page_count: null, + content: buffer, + }); + + return { + id, + url: `${baseUrl}/api/files/${id}`, + }; + }, + download: async (id) => { + const file = await querier.selectFileByID(id); + if (!file || !file.content) { + throw new Error("File not found"); + } + + // Convert buffer back to ReadableStream + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(file.content); + controller.close(); + }, + }); + + return { + stream, + type: file.content_type, + name: file.name, + size: file.byte_length, + }; + }, + }, + logs: { + get: async (opts) => { + return querier.getAgentLogs(opts); + }, + write: async (opts) => { + await querier.writeAgentLog(opts); + }, + }, + traces: { + write: async (spans) => { + await querier.writeAgentTraces(spans); + }, + read: async (opts) => { + return querier.readAgentTraces(opts); + }, + }, + runtime: { + usage: async (opts) => { + // noop + throw new Error("Not implemented"); + }, + }, + }; + + const executionCtx: ExecutionContext = { + waitUntil: async (_promise) => { + // noop + }, + passThroughOnException: () => { + // noop + }, + props: {}, + }; + // Create HTTP server const server = createServer(async (nodeReq, nodeRes) => { try { - const url = new URL( - nodeReq.url || "/", - `http://${nodeReq.headers.host || `localhost:${port}`}` - ); + const hostHeader = nodeReq.headers.host || `localhost:${port}`; + const url = new URL(nodeReq.url || "/", `http://${hostHeader}`); + const isSubdomainRequest = matchRequestHost + ? Boolean(matchRequestHost(hostHeader)) + : false; - if (url.pathname.startsWith("/api")) { + if (url.pathname.startsWith("/api") || isSubdomainRequest) { const req = toFetchRequest(nodeReq); - const response = await api.fetch( - req, - { - AUTH_SECRET: authSecret, - NODE_ENV: "development", - autoJoinOrganizations: true, - serverVersion: pkg.version, - ONBOARDING_AGENT_BUNDLE_URL: - "https://artifacts.blink.host/starter-agent/bundle.tar.gz", - agentStore: (deploymentTargetID) => { - return { - delete: async (key) => { - await querier.deleteAgentStorageKV({ - deployment_target_id: deploymentTargetID, - key, - }); - }, - get: async (key) => { - const value = await querier.selectAgentStorageKV({ - deployment_target_id: deploymentTargetID, - key, - }); - if (!value) { - return undefined; - } - return value.value; - }, - set: async (key, value) => { - const target = - await querier.selectAgentDeploymentTargetByID( - deploymentTargetID - ); - if (!target) { - throw new Error("Deployment target not found"); - } - await querier.upsertAgentStorageKV({ - agent_deployment_target_id: target.id, - agent_id: target.agent_id, - key: key, - value: value, - }); - }, - list: async (prefix, options) => { - const values = await querier.selectAgentStorageKVByPrefix({ - deployment_target_id: deploymentTargetID, - prefix: prefix ?? "", - limit: options?.limit ?? 100, - cursor: options?.cursor, - }); - return { - entries: values.items.map((value) => ({ - key: value.key, - value: value.value, - })), - cursor: values.next_cursor ? values.next_cursor : undefined, - }; - }, - }; - }, - database: async () => { - return querier; - }, - apiBaseURL: url, - accessUrl: new URL(accessUrl), - matchRequestHost, - createRequestURL, - auth: { - handleWebSocketTokenRequest: async (id, request) => { - // WebSocket upgrades are handled in the 'upgrade' event - return new Response(null, { status: 101 }); - }, - sendTokenToWebSocket: async (id, token) => { - wss.clients.forEach((client) => { - const data = wsDataMap.get(client); - if ( - client.readyState === WebSocket.OPEN && - data?.type === "token" && - data.id === id - ) { - client.send(token); - client.close(); - } - }); - }, - }, - devhook: { - handleListen: devhook.handleListen, - handleRequest: devhook.handleRequest, - disableAuth: process.env.BLINK_DEVHOOK_DISABLE_AUTH !== undefined, - }, - chat: { - async handleMessagesChanged(event, id, messages) { - await chatManagerRef.current?.handleMessagesChanged( - event, - id, - messages - ); - }, - handleStart: async (opts) => { - await chatManagerRef.current?.handleStart(opts); - }, - handleStop: async (id) => { - await chatManagerRef.current?.handleStop(id); - }, - handleStream: async (id, req) => { - if (!chatManagerRef.current) { - return new Response("Server not ready", { status: 503 }); - } - // WebSocket upgrades are handled in the 'upgrade' event - if (req.headers.get("upgrade")?.toLowerCase() === "websocket") { - return new Response(null, { status: 101 }); - } - return await chatManagerRef.current.handleStream(id, req); - }, - generateTitle: async (opts) => { - // noop - }, - }, - deployAgent: async (deployment) => { - await deployAgentWithDocker({ - image: - process.env.BLINK_AGENT_IMAGE ?? - "ghcr.io/coder/blink-agent:latest", - deployment, - querier, - baseUrl, - accessUrl, - authSecret, - downloadFile: async (id: string) => { - const file = await querier.selectFileByID(id); - if (!file || !file.content) { - throw new Error("File not found"); - } - - // Convert buffer back to ReadableStream - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(file.content); - controller.close(); - }, - }); - - return { - stream, - type: file.content_type, - name: file.name, - size: file.byte_length, - }; - }, - }); - }, - files: { - upload: async (opts) => { - const id = crypto.randomUUID(); - - // Read file content into buffer - const arrayBuffer = await opts.file.arrayBuffer(); - const buffer = Buffer.from(arrayBuffer); - - // Store file in database - await querier.insertFile({ - id, - name: opts.file.name, - message_id: null, - user_id: null, - organization_id: null, - content_type: opts.file.type, - byte_length: opts.file.size, - pdf_page_count: null, - content: buffer, - }); - - return { - id, - url: `${baseUrl}/api/files/${id}`, - }; - }, - download: async (id) => { - const file = await querier.selectFileByID(id); - if (!file || !file.content) { - throw new Error("File not found"); - } - - // Convert buffer back to ReadableStream - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(file.content); - controller.close(); - }, - }); - - return { - stream, - type: file.content_type, - name: file.name, - size: file.byte_length, - }; - }, - }, - logs: { - get: async (opts) => { - return querier.getAgentLogs(opts); - }, - write: async (opts) => { - await querier.writeAgentLog(opts); - }, - }, - traces: { - write: async (spans) => { - await querier.writeAgentTraces(spans); - }, - read: async (opts) => { - return querier.readAgentTraces(opts); - }, - }, - runtime: { - usage: async (opts) => { - // noop - throw new Error("Not implemented"); - }, - }, - }, - { - waitUntil: async (promise) => { - // noop - }, - passThroughOnException: () => { - // noop - }, - props: {}, - } - ); + const response = await api.fetch(req, bindings, executionCtx); // Write Fetch Response to Node.js response const headersObj: Record = {}; @@ -450,10 +455,47 @@ export async function startServer(options: ServerOptions) { const devhookMatch = pathname?.match(/^\/api\/devhook\/([^/]+)\/?$/); if (devhookMatch?.[1]) { const id = decodeURIComponent(devhookMatch[1]); - void devhook.handleUpgrade(id, request, socket, head).catch((error) => { - console.error("Devhook upgrade error:", error); + if (!request.url) { socket.destroy(); - }); + return; + } + (async () => { + try { + // this is a janky hack to authorize the listen request. it's difficult + // to access hono's auth middleware in websocket upgrades, so instead + // we make a request to the /devhook/:id/url endpoint which requires auth. + const authHeader = Array.isArray(request.headers.authorization) + ? request.headers.authorization[0] + : request.headers.authorization; + const authRequest = new Request( + new URL(`/api/devhook/${id}/url`, baseUrl).toString(), + { + method: "GET", + headers: { + ...(authHeader ? { Authorization: authHeader } : {}), + "Content-Type": "application/json", + }, + } + ); + const response = await api.fetch(authRequest, bindings, executionCtx); + if (!response.ok) { + socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n"); + socket.destroy(); + return; + } + } catch (error) { + console.error("Unexpected error during Devhook auth:", error); + socket.write("HTTP/1.1 500 Internal Server Error\r\n\r\n"); + socket.destroy(); + return; + } + await devhook + .handleUpgrade(id, request, socket, head) + .catch((error) => { + console.error("Devhook upgrade error:", error); + socket.destroy(); + }); + })(); return; } @@ -503,9 +545,10 @@ export async function startServer(options: ServerOptions) { process.env as Record ); + (server as StartedServer).bindings = bindings; server.listen(port); - return server; + return server as StartedServer; } /** diff --git a/packages/server/src/test.ts b/packages/server/src/test.ts new file mode 100644 index 00000000..bbfd5d32 --- /dev/null +++ b/packages/server/src/test.ts @@ -0,0 +1,233 @@ +import type { Server } from "node:http"; +import { createServer as createNetServer } from "node:net"; +import Client from "@blink.so/api"; +import type { User } from "@blink.so/database/schema"; +import { createPostgresURL, createTestUser } from "@blink.so/database/test"; +import { encode } from "next-auth/jwt"; +import { startServer } from "./server"; + +export interface ServeOptions { + postgresUrl?: string; + authSecret?: string; + port?: number; + host?: string; + baseUrl?: string; + accessUrl?: string; + wildcardAccessUrl?: string | false; + devProxy?: string | false; + setEnv?: boolean; +} + +const stripTrailingSlash = (value: string): string => { + return value.replace(/\/+$/, ""); +}; + +const getAvailablePort = async (): Promise => { + return new Promise((resolve, reject) => { + const server = createNetServer(); + server.once("error", reject); + server.listen(0, () => { + const address = server.address(); + if (!address || typeof address === "string") { + server.close(() => { + reject(new Error("Failed to determine an available port")); + }); + return; + } + const { port } = address; + server.close((err) => { + if (err) { + reject(err); + return; + } + resolve(port); + }); + }); + }); +}; + +const waitForListening = async (server: Server): Promise => { + if (server.listening) { + return; + } + await new Promise((resolve, reject) => { + const onError = (err: unknown) => { + cleanup(); + reject(err); + }; + const onListening = () => { + cleanup(); + resolve(); + }; + const cleanup = () => { + server.off("error", onError); + server.off("listening", onListening); + }; + server.once("error", onError); + server.once("listening", onListening); + }); +}; + +const isServerNotRunningError = (err: unknown): boolean => { + if (!err || typeof err !== "object") { + return false; + } + const code = (err as { code?: string }).code; + if (code === "ERR_SERVER_NOT_RUNNING") { + return true; + } + const message = (err as { message?: string }).message; + return ( + typeof message === "string" && message.includes("Server is not running") + ); +}; + +const closeServer = async (server: Server): Promise => { + if (!server.listening) { + return; + } + if (typeof server.closeAllConnections === "function") { + try { + server.closeAllConnections(); + } catch (err) { + if (!isServerNotRunningError(err)) { + throw err; + } + } + } + await new Promise((resolve, reject) => { + server.close((err) => { + if (err && !isServerNotRunningError(err)) { + reject(err); + return; + } + resolve(); + }); + }); +}; + +const applyEnv = (values: Record) => { + const previous: Record = {}; + for (const [key, value] of Object.entries(values)) { + previous[key] = process.env[key]; + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } + } + return () => { + for (const [key, value] of Object.entries(previous)) { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } + } + }; +}; + +export const serve = async (options?: ServeOptions) => { + const postgresUrl = options?.postgresUrl ?? (await createPostgresURL()); + const authSecret = options?.authSecret ?? crypto.randomUUID(); + const host = options?.host ?? "localhost"; + let port = options?.port; + let baseUrl = options?.baseUrl; + + if (baseUrl) { + baseUrl = stripTrailingSlash(baseUrl); + const parsedBaseUrl = new URL(baseUrl); + if (!port) { + if (parsedBaseUrl.port) { + port = Number(parsedBaseUrl.port); + } else { + port = await getAvailablePort(); + parsedBaseUrl.port = String(port); + baseUrl = stripTrailingSlash(parsedBaseUrl.toString()); + } + } + } else { + port = port ?? (await getAvailablePort()); + baseUrl = `http://${host}:${port}`; + } + + if (!baseUrl) { + throw new Error("Failed to resolve baseUrl for test server"); + } + if (port === undefined) { + throw new Error("Failed to resolve port for test server"); + } + + const accessUrl = stripTrailingSlash(options?.accessUrl ?? baseUrl); + const wildcardAccessUrl = + options?.wildcardAccessUrl === false + ? undefined + : (options?.wildcardAccessUrl ?? `*.${new URL(accessUrl).host}`); + const devProxy = + options?.devProxy === false + ? undefined + : (options?.devProxy ?? "localhost:3000"); + + const shouldSetEnv = options?.setEnv ?? true; + const restoreEnv = shouldSetEnv + ? applyEnv({ + AUTH_SECRET: authSecret, + POSTGRES_URL: postgresUrl, + NEXT_PUBLIC_BASE_URL: baseUrl, + SELF_HOSTED: "true", + }) + : () => {}; + + const server = await startServer({ + port, + postgresUrl, + authSecret, + baseUrl, + accessUrl, + devProxy, + wildcardAccessUrl, + }); + + await waitForListening(server); + + const createAuthToken = async (userID: string) => { + const token = await encode({ + secret: server.bindings.AUTH_SECRET, + salt: "blink_session_token", + token: { + sub: userID, + }, + }); + return token; + }; + + return { + url: new URL(baseUrl), + bindings: server.bindings, + helpers: { + createUser: async ( + userData?: Partial & { + username?: string; + avatar_url?: string | null; + } + ) => { + const db = await server.bindings.database(); + const user = await createTestUser(db, userData); + return { + user, + client: new Client({ + baseURL: baseUrl, + authToken: await createAuthToken(user.id), + }), + }; + }, + }, + [Symbol.asyncDispose]: async () => { + try { + await closeServer(server); + } finally { + restoreEnv(); + } + }, + }; +};